aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2016-01-22 22:21:22 +0900
committerKazuki Yamaguchi <k@rhe.jp>2016-01-22 22:21:22 +0900
commitbc3476686dfe0f32caef01751ff2689bafbf458e (patch)
treeb733ad4f8099ff83a1559cf33cf3d59a7d591c75
parent1fc56280e2ee284ab341d62be588e0462602dd92 (diff)
downloadaclog-bc3476686dfe0f32caef01751ff2689bafbf458e.tar.gz
collector: move TweetResponseNotificationJob to collector/notification_queue
-rw-r--r--app/jobs/tweet_response_notification_job.rb61
-rw-r--r--collector/daemon.rb2
-rw-r--r--collector/event_queue.rb6
-rw-r--r--collector/notification_queue.rb84
4 files changed, 88 insertions, 65 deletions
diff --git a/app/jobs/tweet_response_notification_job.rb b/app/jobs/tweet_response_notification_job.rb
deleted file mode 100644
index 0c6ed7f..0000000
--- a/app/jobs/tweet_response_notification_job.rb
+++ /dev/null
@@ -1,61 +0,0 @@
-class TweetResponseNotificationJob < ActiveJob::Base
- queue_as :default
-
- # Notifies the count of favovorites for the tweet with tweeting a reply from notification account.
- # Notification will be send only when the count reached the specified number in settings.yml.
- # THIS METHOD IS NOT THREAD SAFE
- #
- # @param [Array] An array of tweets ids
- def perform(tweet_ids)
- return unless Settings.notification.enabled
-
- Tweet.where(id: tweet_ids).includes(user: :account).each do |tweet|
- perform_tweet(tweet)
- end
- end
-
- private
- def perform_tweet(tweet)
- last_count = Rails.cache.read("notification/tweets/#{ tweet.id }/favorites_count")
- Rails.cache.write("notification/tweets/#{ tweet.id }/favorites_count", [last_count || 0, tweet.favorites_count].max)
-
- if last_count
- t_count = Settings.notification.favorites.select {|m| last_count < m && m <= tweet.favorites_count }.last
- else
- t_count = Settings.notification.favorites.include?(tweet.favorites_count) && tweet.favorites_count
- end
-
- if t_count
- notify(tweet, "#{ t_count } likes!")
- end
- end
-
- def notify(tweet, text)
- user = tweet.user
- account = user.account
-
- if account && account.active? && account.notification_enabled?
- post("@#{user.screen_name} #{text} #{Settings.base_url}/i/#{tweet.id}", tweet.id)
- end
- end
-
- def post(text, reply_to = 0)
- Settings.notification.accounts.each do |hash|
- begin
- client(hash).update(text, in_reply_to_status_id: reply_to)
- break
- rescue Twitter::Error::Forbidden => e
- raise e unless e.message = "User is over daily status update limit."
- end
- end
- end
-
- def client(acc)
- @_client ||= {}
- @_client[acc] ||=
- Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key,
- consumer_secret: Settings.notification.consumer.secret,
- access_token: acc.token,
- access_token_secret: acc.secret)
- end
-end
diff --git a/collector/daemon.rb b/collector/daemon.rb
index a978952..9378ee2 100644
--- a/collector/daemon.rb
+++ b/collector/daemon.rb
@@ -3,6 +3,7 @@ require_relative "event_queue"
require_relative "node_connection"
require_relative "node_manager"
require_relative "control_server"
+require_relative "notification_queue"
module Collector
module Daemon
@@ -24,6 +25,7 @@ module Collector
EM.add_periodic_timer(Settings.collector.flush_interval) do
event_queue.flush
end
+ NotificationQueue.start
nodes = EM.start_server("0.0.0.0", Settings.collector.server_port, Collector::NodeConnection, event_queue)
diff --git a/collector/event_queue.rb b/collector/event_queue.rb
index 0c80653..6dce058 100644
--- a/collector/event_queue.rb
+++ b/collector/event_queue.rb
@@ -41,10 +41,8 @@ module Collector
end
if Settings.notification.enabled
- tweet_ids = favorites.map {|f| f[:target_object][:id] }
- if tweet_ids.size > 0
- TweetResponseNotificationJob.perform_later(tweet_ids)
- end
+ tweet_ids = favorites.map {|f| f.dig(:target_object, :id) }
+ NotificationQueue.push(tweet_ids)
end
if unauthorizeds.size > 0
diff --git a/collector/notification_queue.rb b/collector/notification_queue.rb
new file mode 100644
index 0000000..8665aae
--- /dev/null
+++ b/collector/notification_queue.rb
@@ -0,0 +1,84 @@
+module Collector
+ class NotificationQueue
+ def initialize
+ @queue = Queue.new # not EM::Queue
+ end
+
+ def run
+ return unless Settings.notification.enabled
+
+ while true
+ ids = @queue.pop
+ next if ids.empty?
+
+ Tweet.where(id: ids).includes(user: :account).each do |tweet|
+ perform_tweet(tweet)
+ end
+ end
+ end
+
+ def push(ids)
+ @queue << ids
+ end
+
+ private
+ def perform_tweet(tweet)
+ last_count = Rails.cache.read("notification/tweets/#{tweet.id}/favorites_count")
+ Rails.cache.write("notification/tweets/#{tweet.id}/favorites_count", [last_count || 0, tweet.favorites_count].max)
+
+ if last_count
+ t_count = Settings.notification.favorites.select { |m| last_count < m && m <= tweet.favorites_count }.last
+ else
+ t_count = Settings.notification.favorites.include?(tweet.favorites_count) && tweet.favorites_count
+ end
+
+ if t_count
+ notify(tweet, "#{t_count}likes!")
+ end
+ end
+
+ def notify(tweet, text)
+ user = tweet.user
+ account = user.account
+
+ if account && account.active? && account.notification_enabled?
+ post("@#{user.screen_name} #{text} #{Settings.base_url}/i/#{tweet.id}", tweet.id)
+ end
+ end
+
+ def post(text, reply_to = 0)
+ Settings.notification.accounts.each do |hash|
+ begin
+ client(hash).update(text, in_reply_to_status_id: reply_to)
+ break
+ rescue Twitter::Error::Forbidden => e
+ raise e unless e.message = "User is over daily status update limit."
+ end
+ end
+ end
+
+ def client(acc)
+ @_client ||= {}
+ @_client[acc] ||=
+ Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key,
+ consumer_secret: Settings.notification.consumer.secret,
+ access_token: acc.token,
+ access_token_secret: acc.secret)
+ end
+
+ class << self
+ def push(ids)
+ instance.push(ids)
+ end
+
+ def instance
+ @queue or raise(ArgumentError, "NotificationQueue is not initialized")
+ end
+
+ def start
+ @queue = NotificationQueue.new
+ EM.defer { @queue.run }
+ end
+ end
+ end
+end