aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorre4k <re4k@re4k.info>2013-04-01 03:59:33 +0900
committerre4k <re4k@re4k.info>2013-04-01 03:59:33 +0900
commit90d8845257404bb445d7611f9e9c7828ec25dc34 (patch)
tree6f3d19afa530f4e9d502e55c36d64202c2878ac5 /lib
parent27bf5349f62ce11763441eb2aa8e69ee10bf9987 (diff)
downloadaclog-90d8845257404bb445d7611f9e9c7828ec25dc34.tar.gz
Add importing favs feature(partial: only status_activity).
Refactor models
Diffstat (limited to 'lib')
-rw-r--r--lib/receiver/worker.rb99
1 files changed, 43 insertions, 56 deletions
diff --git a/lib/receiver/worker.rb b/lib/receiver/worker.rb
index 87b55da..345ad73 100644
--- a/lib/receiver/worker.rb
+++ b/lib/receiver/worker.rb
@@ -10,11 +10,6 @@ end
class Receiver::Worker < DaemonSpawn::Base
class DBProxyServer < EM::Connection
- @@wq = EM::WorkQueue::WorkQueue.new do |arg|
- arg.call
- end
- @@wq.start
-
def send_account_all
Account.where("id % ? = ?", Settings.worker_count, @worker_number).each do |account|
puts "Sent #{account.id}/#{account.user_id}"
@@ -34,8 +29,17 @@ class Receiver::Worker < DaemonSpawn::Base
def initialize
$connections ||= {}
+
@worker_number = nil
@pac = MessagePack::Unpacker.new
+
+ @@saved_tweets ||= []
+ unless defined?(@@wq)
+ @@wq = EM::WorkQueue::WorkQueue.new do |arg|
+ arg.call
+ end
+ @@wq.start
+ end
end
def post_init
@@ -112,32 +116,32 @@ class Receiver::Worker < DaemonSpawn::Base
def receive_user(msg)
@@wq.push -> do
$logger.debug("Received User(#{@worker_number}): #{msg["id"]}")
- begin
- rec = User.find_or_initialize_by(:id => msg["id"])
- rec.screen_name = msg["screen_name"]
- rec.name = msg["name"]
- rec.profile_image_url = msg["profile_image_url"]
- rec.protected = msg["protected"]
- rec.save! if rec.changed?
- rescue
- $logger.error("Unknown error while inserting user: #{$!}/#{$@}")
- end
+ User.from_hash(:id => msg["id"],
+ :screen_name => msg["screen_name"],
+ :name => msg["name"],
+ :profile_image_url => msg["profile_image_url"],
+ :protected => msg["protected"])
end
end
def receive_tweet(msg)
- @@wq.push -> do
- $logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}")
- begin
- Tweet.create!(:id => msg["id"],
- :text => msg["text"],
- :source => msg["source"],
- :tweeted_at => Time.parse(msg["tweeted_at"]),
- :user_id => msg["user_id"])
- rescue ActiveRecord::RecordNotUnique
- $logger.debug("Duplicate Tweet(#{@worker_number}): #{msg["id"]}")
- rescue
- $logger.error("Unknown error while inserting tweet: #{$!}/#{$@}")
+ Rails.logger.silence do
+ @@wq.push -> do
+ $logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}")
+ unless @@saved_tweets.include?(msg["id"])
+ @@saved_tweets << msg["id"]
+ if @@saved_tweets.size > 10000
+ $logger.debug("Tweet ids dropped: #{@@saved_tweets.shift}")
+ end
+
+ Tweet.from_hash(:id => msg["id"],
+ :text => msg["text"],
+ :source => msg["source"],
+ :tweeted_at => Time.parse(msg["tweeted_at"]),
+ :user_id => msg["user_id"])
+ else
+ $logger.debug("Tweet already exists(#{@worker_number}): #{msg["id"]}")
+ end
end
end
end
@@ -145,47 +149,30 @@ class Receiver::Worker < DaemonSpawn::Base
def receive_favorite(msg)
@@wq.push -> do
$logger.debug("Receive Favorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
- begin
- Favorite.create!(:tweet_id => msg["tweet_id"],
+ Favorite.from_hash(:tweet_id => msg["tweet_id"],
:user_id => msg["user_id"])
- rescue ActiveRecord::RecordNotUnique
- $logger.debug("Duplicate Favorite(#{@worker_number}): #{msg["id"]}")
- rescue
- $logger.error("Unknown error while inserting favorite: #{$!}/#{$@}")
- end
end
end
def receive_retweet(msg)
@@wq.push -> do
- $logger.debug("Received Retweet")
- begin
- Retweet.create!(:id => msg["id"],
+ $logger.debug("Receive Retweet(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
+ Retweet.from_hash(:id => msg["id"],
:tweet_id => msg["tweet_id"],
:user_id => msg["user_id"])
- rescue ActiveRecord::RecordNotUnique
- $logger.debug("Duplicate Retweet(#{@worker_number}): #{msg["id"]}")
- rescue
- $logger.error("Unknown error while inserting retweet: #{$!}/#{$@}")
- end
end
end
def receive_delete(msg)
@@wq.push -> do
- begin
- if msg["id"]
- $logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}")
- Tweet.where(:id => msg["id"]).destroy_all
- Retweet.where(:id => msg["id"]).destroy_all
- elsif msg["tweet_id"]
- $logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
- Favorite
- .where("tweet_id = #{msg["tweet_id"]} AND user_id = #{msg["user_id"]}")
- .destroy_all
- end
- rescue
- $logger.error("Unknown error while deleting: #{$!}/#{$@}")
+ if msg["id"]
+ $logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}")
+ Tweet.delete_from_id(msg["id"])
+ Retweet.delete_from_id(msg["id"])
+ elsif msg["tweet_id"]
+ $logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
+ Favorite.delete_from_hash(:tweet_id => msg["tweet_id"],
+ :user_id => msg["user_id"])
end
end
end
@@ -241,7 +228,7 @@ class Receiver::Worker < DaemonSpawn::Base
def initialize(opts = {})
super(opts)
- $logger = Receiver::Logger.new(:info)
+ $logger = Receiver::Logger.new(Rails.env.development? ? :debug : :info)
$connections = {}
end