diff options
Diffstat (limited to 'lib/aclog/receiver/collector_connection.rb')
-rw-r--r-- | lib/aclog/receiver/collector_connection.rb | 58 |
1 files changed, 13 insertions, 45 deletions
diff --git a/lib/aclog/receiver/collector_connection.rb b/lib/aclog/receiver/collector_connection.rb index 8e80b1b..0ee8e6c 100644 --- a/lib/aclog/receiver/collector_connection.rb +++ b/lib/aclog/receiver/collector_connection.rb @@ -16,18 +16,15 @@ module Aclog _cr = -> bl { bl.call; @@queue.pop &_cr } EM.defer { @@queue.pop &_cr } end - - @@saved_tweets ||= [] end def send_account(account) - send_object( - type: "account", - id: account.id, - oauth_token: account.oauth_token, - oauth_token_secret: account.oauth_token_secret, - user_id: account.user_id, - consumer_version: account.consumer_version) + send_object(type: "account", + id: account.id, + oauth_token: account.oauth_token, + oauth_token_secret: account.oauth_token_secret, + user_id: account.user_id, + consumer_version: account.consumer_version) Rails.logger.debug("Sent #{account.id}/#{account.user_id}") end @@ -36,8 +33,8 @@ module Aclog end def unbind + @connections.reject! {|k, v| v == self } Rails.logger.info("Connection closed(#{@worker_number})") - @connections.delete_if {|k, v| v == self } end def receive_data(data) @@ -126,44 +123,18 @@ module Aclog # unregister end - def receive_user(msg) - @@queue.push -> do - Rails.logger.debug("Received User(#{@worker_number}): #{msg["id"]}") - User.from_hash(:id => msg["id"], - :screen_name => msg["screen_name"], - :name => msg["name"], - :profile_image_url => msg["profile_image_url"], - :protected => msg["protected"]) - end - end - def receive_tweet(msg) @@queue.push -> do Rails.logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}") - unless @@saved_tweets.include?(msg["id"]) - @@saved_tweets << msg["id"] - if @@saved_tweets.size > 100000 - Rails.logger.debug("Tweet id dropped from cache: #{@@saved_tweets.shift}") - end - - Tweet.from_hash(:id => msg["id"], - :text => msg["text"], - :source => msg["source"], - :tweeted_at => Time.parse(msg["tweeted_at"]), - :user_id => msg["user_id"]) - else - Rails.logger.debug("Tweet already exists(#{@worker_number}): #{msg["id"]}") - end + Tweet.from_receiver(msg) end end def receive_favorite(msg) @@queue.push -> do Rails.logger.debug("Receive Favorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}") - f = Favorite.from_hash(:tweet_id => msg["tweet_id"], - :user_id => msg["user_id"]) - if Settings.notification.enabled && t = Tweet.find_by(id: msg["tweet_id"]) - Notification.notify_favorite(t) + if f = Favorite.from_receiver(msg) + f.tweet.notify_favorite end end end @@ -171,24 +142,21 @@ module Aclog def receive_unfavorite(msg) @@queue.push -> do Rails.logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}") - Favorite.delete_from_hash(:tweet_id => msg["tweet_id"], - :user_id => msg["user_id"]) + Favorite.delete_from_receiver(msg) end end def receive_retweet(msg) @@queue.push -> do Rails.logger.debug("Receive Retweet(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}") - Retweet.from_hash(:id => msg["id"], - :tweet_id => msg["tweet_id"], - :user_id => msg["user_id"]) + Retweet.from_receiver(msg) end end def receive_delete(msg) @@queue.push -> do Rails.logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}") - Tweet.delete_from_id(msg["id"]) + Tweet.delete_from_receiver(msg) end end end |