From 643103a7146f64d81815029e14b98ae96185ea4b Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Fri, 22 Jan 2016 23:38:56 +0900 Subject: collector: refactor NotificationQueue --- collector/daemon.rb | 10 +++---- collector/event_queue.rb | 12 +++++++-- collector/notification_queue.rb | 60 ++++++++++++++++++----------------------- 3 files changed, 41 insertions(+), 41 deletions(-) (limited to 'collector') diff --git a/collector/daemon.rb b/collector/daemon.rb index 9378ee2..8f7ab05 100644 --- a/collector/daemon.rb +++ b/collector/daemon.rb @@ -14,6 +14,9 @@ module Collector @start_time = Time.now set_loggers + dalli = Dalli::Client.new(Settings.cache.memcached, namespace: "aclog-collector:") + dalli.alive! + EM.run do sock_path = File.join(Rails.root, "tmp", "sockets", "collector.sock") File.delete(sock_path) if File.exist?(sock_path) @@ -21,11 +24,8 @@ module Collector control.listen(MessagePack::RPC::UNIXServerTransport.new(sock_path), Collector::ControlServer.new) EM.defer { control.run } - event_queue = Collector::EventQueue.new - EM.add_periodic_timer(Settings.collector.flush_interval) do - event_queue.flush - end - NotificationQueue.start + event_queue = EventQueue.start(dalli) + NotificationQueue.start(dalli) nodes = EM.start_server("0.0.0.0", Settings.collector.server_port, Collector::NodeConnection, event_queue) diff --git a/collector/event_queue.rb b/collector/event_queue.rb index 6dce058..7131b54 100644 --- a/collector/event_queue.rb +++ b/collector/event_queue.rb @@ -1,7 +1,7 @@ module Collector class EventQueue - def initialize - @dalli = Dalli::Client.new(Settings.cache.memcached, namespace: "aclog-collector:") + def initialize(dalli) + @dalli = dalli @queue_mutex = Mutex.new @queue_user = Queue.new @@ -13,6 +13,14 @@ module Collector @queue_unauthorized = Queue.new end + def self.start(dalli) + instance = self.new(dalli) + EM.add_periodic_timer(Settings.collector.flush_interval) do + instance.flush + end + instance + end + def flush users = tweets = favorites = retweets = unfavorites = deletes = unauthorizeds = nil diff --git a/collector/notification_queue.rb b/collector/notification_queue.rb index 8665aae..7f923e4 100644 --- a/collector/notification_queue.rb +++ b/collector/notification_queue.rb @@ -1,7 +1,15 @@ module Collector class NotificationQueue - def initialize + def initialize(dalli) + @dalli = dalli @queue = Queue.new # not EM::Queue + @thresholds = Settings.notification.favorites.freeze + @clients = Settings.notification.accounts.map { |hash| + Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key, + consumer_secret: Settings.notification.consumer.secret, + access_token: hash.token, + access_token_secret: hash.secret) + }.freeze end def run @@ -11,8 +19,11 @@ module Collector ids = @queue.pop next if ids.empty? - Tweet.where(id: ids).includes(user: :account).each do |tweet| - perform_tweet(tweet) + Tweet.where(id: ids).joins(user: :account).each do |tweet| + acc = tweet.user&.account + if acc&.active? && acc&.notification_enabled? + perform_tweet(tweet) + end end end end @@ -23,49 +34,30 @@ module Collector private def perform_tweet(tweet) - last_count = Rails.cache.read("notification/tweets/#{tweet.id}/favorites_count") - Rails.cache.write("notification/tweets/#{tweet.id}/favorites_count", [last_count || 0, tweet.favorites_count].max) + last_count = @dalli.get("notification/tweets/#{tweet.id}/favorites_count") + @dalli.set("notification/tweets/#{tweet.id}/favorites_count", [last_count || 0, tweet.favorites_count].max) - if last_count - t_count = Settings.notification.favorites.select { |m| last_count < m && m <= tweet.favorites_count }.last - else - t_count = Settings.notification.favorites.include?(tweet.favorites_count) && tweet.favorites_count - end - - if t_count - notify(tweet, "#{t_count}likes!") + if last_count && (t_count = @thresholds.select { |m| last_count < m && m <= tweet.favorites_count }.last) || + @thresholds.include?(t_count = tweet.favorites_count) + post("@#{tweet.user.screen_name} #{t_count}likes! #{tweet_url(tweet)}", tweet.id) end end - def notify(tweet, text) - user = tweet.user - account = user.account - - if account && account.active? && account.notification_enabled? - post("@#{user.screen_name} #{text} #{Settings.base_url}/i/#{tweet.id}", tweet.id) - end + def tweet_url(tweet) + "#{Settings.base_url}/i/#{tweet.id}" end def post(text, reply_to = 0) - Settings.notification.accounts.each do |hash| + @clients.each do |client| begin - client(hash).update(text, in_reply_to_status_id: reply_to) + client.update(text, in_reply_to_status_id: reply_to) break rescue Twitter::Error::Forbidden => e - raise e unless e.message = "User is over daily status update limit." + raise e unless e.message == "User is over daily status update limit." end end end - def client(acc) - @_client ||= {} - @_client[acc] ||= - Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key, - consumer_secret: Settings.notification.consumer.secret, - access_token: acc.token, - access_token_secret: acc.secret) - end - class << self def push(ids) instance.push(ids) @@ -75,8 +67,8 @@ module Collector @queue or raise(ArgumentError, "NotificationQueue is not initialized") end - def start - @queue = NotificationQueue.new + def start(dalli) + @queue = NotificationQueue.new(dalli) EM.defer { @queue.run } end end -- cgit v1.2.3