diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2016-01-22 22:21:22 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2016-01-22 22:21:22 +0900 |
commit | bc3476686dfe0f32caef01751ff2689bafbf458e (patch) | |
tree | b733ad4f8099ff83a1559cf33cf3d59a7d591c75 | |
parent | 1fc56280e2ee284ab341d62be588e0462602dd92 (diff) | |
download | aclog-bc3476686dfe0f32caef01751ff2689bafbf458e.tar.gz |
collector: move TweetResponseNotificationJob to collector/notification_queue
-rw-r--r-- | app/jobs/tweet_response_notification_job.rb | 61 | ||||
-rw-r--r-- | collector/daemon.rb | 2 | ||||
-rw-r--r-- | collector/event_queue.rb | 6 | ||||
-rw-r--r-- | collector/notification_queue.rb | 84 |
4 files changed, 88 insertions, 65 deletions
diff --git a/app/jobs/tweet_response_notification_job.rb b/app/jobs/tweet_response_notification_job.rb deleted file mode 100644 index 0c6ed7f..0000000 --- a/app/jobs/tweet_response_notification_job.rb +++ /dev/null @@ -1,61 +0,0 @@ -class TweetResponseNotificationJob < ActiveJob::Base - queue_as :default - - # Notifies the count of favovorites for the tweet with tweeting a reply from notification account. - # Notification will be send only when the count reached the specified number in settings.yml. - # THIS METHOD IS NOT THREAD SAFE - # - # @param [Array] An array of tweets ids - def perform(tweet_ids) - return unless Settings.notification.enabled - - Tweet.where(id: tweet_ids).includes(user: :account).each do |tweet| - perform_tweet(tweet) - end - end - - private - def perform_tweet(tweet) - last_count = Rails.cache.read("notification/tweets/#{ tweet.id }/favorites_count") - Rails.cache.write("notification/tweets/#{ tweet.id }/favorites_count", [last_count || 0, tweet.favorites_count].max) - - if last_count - t_count = Settings.notification.favorites.select {|m| last_count < m && m <= tweet.favorites_count }.last - else - t_count = Settings.notification.favorites.include?(tweet.favorites_count) && tweet.favorites_count - end - - if t_count - notify(tweet, "#{ t_count } likes!") - end - end - - def notify(tweet, text) - user = tweet.user - account = user.account - - if account && account.active? && account.notification_enabled? - post("@#{user.screen_name} #{text} #{Settings.base_url}/i/#{tweet.id}", tweet.id) - end - end - - def post(text, reply_to = 0) - Settings.notification.accounts.each do |hash| - begin - client(hash).update(text, in_reply_to_status_id: reply_to) - break - rescue Twitter::Error::Forbidden => e - raise e unless e.message = "User is over daily status update limit." - end - end - end - - def client(acc) - @_client ||= {} - @_client[acc] ||= - Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key, - consumer_secret: Settings.notification.consumer.secret, - access_token: acc.token, - access_token_secret: acc.secret) - end -end diff --git a/collector/daemon.rb b/collector/daemon.rb index a978952..9378ee2 100644 --- a/collector/daemon.rb +++ b/collector/daemon.rb @@ -3,6 +3,7 @@ require_relative "event_queue" require_relative "node_connection" require_relative "node_manager" require_relative "control_server" +require_relative "notification_queue" module Collector module Daemon @@ -24,6 +25,7 @@ module Collector EM.add_periodic_timer(Settings.collector.flush_interval) do event_queue.flush end + NotificationQueue.start nodes = EM.start_server("0.0.0.0", Settings.collector.server_port, Collector::NodeConnection, event_queue) diff --git a/collector/event_queue.rb b/collector/event_queue.rb index 0c80653..6dce058 100644 --- a/collector/event_queue.rb +++ b/collector/event_queue.rb @@ -41,10 +41,8 @@ module Collector end if Settings.notification.enabled - tweet_ids = favorites.map {|f| f[:target_object][:id] } - if tweet_ids.size > 0 - TweetResponseNotificationJob.perform_later(tweet_ids) - end + tweet_ids = favorites.map {|f| f.dig(:target_object, :id) } + NotificationQueue.push(tweet_ids) end if unauthorizeds.size > 0 diff --git a/collector/notification_queue.rb b/collector/notification_queue.rb new file mode 100644 index 0000000..8665aae --- /dev/null +++ b/collector/notification_queue.rb @@ -0,0 +1,84 @@ +module Collector + class NotificationQueue + def initialize + @queue = Queue.new # not EM::Queue + end + + def run + return unless Settings.notification.enabled + + while true + ids = @queue.pop + next if ids.empty? + + Tweet.where(id: ids).includes(user: :account).each do |tweet| + perform_tweet(tweet) + end + end + end + + def push(ids) + @queue << ids + end + + private + def perform_tweet(tweet) + last_count = Rails.cache.read("notification/tweets/#{tweet.id}/favorites_count") + Rails.cache.write("notification/tweets/#{tweet.id}/favorites_count", [last_count || 0, tweet.favorites_count].max) + + if last_count + t_count = Settings.notification.favorites.select { |m| last_count < m && m <= tweet.favorites_count }.last + else + t_count = Settings.notification.favorites.include?(tweet.favorites_count) && tweet.favorites_count + end + + if t_count + notify(tweet, "#{t_count}likes!") + end + end + + def notify(tweet, text) + user = tweet.user + account = user.account + + if account && account.active? && account.notification_enabled? + post("@#{user.screen_name} #{text} #{Settings.base_url}/i/#{tweet.id}", tweet.id) + end + end + + def post(text, reply_to = 0) + Settings.notification.accounts.each do |hash| + begin + client(hash).update(text, in_reply_to_status_id: reply_to) + break + rescue Twitter::Error::Forbidden => e + raise e unless e.message = "User is over daily status update limit." + end + end + end + + def client(acc) + @_client ||= {} + @_client[acc] ||= + Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key, + consumer_secret: Settings.notification.consumer.secret, + access_token: acc.token, + access_token_secret: acc.secret) + end + + class << self + def push(ids) + instance.push(ids) + end + + def instance + @queue or raise(ArgumentError, "NotificationQueue is not initialized") + end + + def start + @queue = NotificationQueue.new + EM.defer { @queue.run } + end + end + end +end |