aboutsummaryrefslogtreecommitdiffstats
path: root/lib/aclog/receiver/collector_connection.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/aclog/receiver/collector_connection.rb')
-rw-r--r--lib/aclog/receiver/collector_connection.rb58
1 files changed, 13 insertions, 45 deletions
diff --git a/lib/aclog/receiver/collector_connection.rb b/lib/aclog/receiver/collector_connection.rb
index 8e80b1b..0ee8e6c 100644
--- a/lib/aclog/receiver/collector_connection.rb
+++ b/lib/aclog/receiver/collector_connection.rb
@@ -16,18 +16,15 @@ module Aclog
_cr = -> bl { bl.call; @@queue.pop &_cr }
EM.defer { @@queue.pop &_cr }
end
-
- @@saved_tweets ||= []
end
def send_account(account)
- send_object(
- type: "account",
- id: account.id,
- oauth_token: account.oauth_token,
- oauth_token_secret: account.oauth_token_secret,
- user_id: account.user_id,
- consumer_version: account.consumer_version)
+ send_object(type: "account",
+ id: account.id,
+ oauth_token: account.oauth_token,
+ oauth_token_secret: account.oauth_token_secret,
+ user_id: account.user_id,
+ consumer_version: account.consumer_version)
Rails.logger.debug("Sent #{account.id}/#{account.user_id}")
end
@@ -36,8 +33,8 @@ module Aclog
end
def unbind
+ @connections.reject! {|k, v| v == self }
Rails.logger.info("Connection closed(#{@worker_number})")
- @connections.delete_if {|k, v| v == self }
end
def receive_data(data)
@@ -126,44 +123,18 @@ module Aclog
# unregister
end
- def receive_user(msg)
- @@queue.push -> do
- Rails.logger.debug("Received User(#{@worker_number}): #{msg["id"]}")
- User.from_hash(:id => msg["id"],
- :screen_name => msg["screen_name"],
- :name => msg["name"],
- :profile_image_url => msg["profile_image_url"],
- :protected => msg["protected"])
- end
- end
-
def receive_tweet(msg)
@@queue.push -> do
Rails.logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}")
- unless @@saved_tweets.include?(msg["id"])
- @@saved_tweets << msg["id"]
- if @@saved_tweets.size > 100000
- Rails.logger.debug("Tweet id dropped from cache: #{@@saved_tweets.shift}")
- end
-
- Tweet.from_hash(:id => msg["id"],
- :text => msg["text"],
- :source => msg["source"],
- :tweeted_at => Time.parse(msg["tweeted_at"]),
- :user_id => msg["user_id"])
- else
- Rails.logger.debug("Tweet already exists(#{@worker_number}): #{msg["id"]}")
- end
+ Tweet.from_receiver(msg)
end
end
def receive_favorite(msg)
@@queue.push -> do
Rails.logger.debug("Receive Favorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
- f = Favorite.from_hash(:tweet_id => msg["tweet_id"],
- :user_id => msg["user_id"])
- if Settings.notification.enabled && t = Tweet.find_by(id: msg["tweet_id"])
- Notification.notify_favorite(t)
+ if f = Favorite.from_receiver(msg)
+ f.tweet.notify_favorite
end
end
end
@@ -171,24 +142,21 @@ module Aclog
def receive_unfavorite(msg)
@@queue.push -> do
Rails.logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
- Favorite.delete_from_hash(:tweet_id => msg["tweet_id"],
- :user_id => msg["user_id"])
+ Favorite.delete_from_receiver(msg)
end
end
def receive_retweet(msg)
@@queue.push -> do
Rails.logger.debug("Receive Retweet(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
- Retweet.from_hash(:id => msg["id"],
- :tweet_id => msg["tweet_id"],
- :user_id => msg["user_id"])
+ Retweet.from_receiver(msg)
end
end
def receive_delete(msg)
@@queue.push -> do
Rails.logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}")
- Tweet.delete_from_id(msg["id"])
+ Tweet.delete_from_receiver(msg)
end
end
end