aboutsummaryrefslogtreecommitdiffstats
path: root/collector
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2016-01-22 23:38:56 +0900
committerKazuki Yamaguchi <k@rhe.jp>2016-01-22 23:38:56 +0900
commit643103a7146f64d81815029e14b98ae96185ea4b (patch)
treed645acc3ad68036af9a2e8f6dde468737a5b81b2 /collector
parentbc3476686dfe0f32caef01751ff2689bafbf458e (diff)
downloadaclog-643103a7146f64d81815029e14b98ae96185ea4b.tar.gz
collector: refactor NotificationQueue
Diffstat (limited to 'collector')
-rw-r--r--collector/daemon.rb10
-rw-r--r--collector/event_queue.rb12
-rw-r--r--collector/notification_queue.rb60
3 files changed, 41 insertions, 41 deletions
diff --git a/collector/daemon.rb b/collector/daemon.rb
index 9378ee2..8f7ab05 100644
--- a/collector/daemon.rb
+++ b/collector/daemon.rb
@@ -14,6 +14,9 @@ module Collector
@start_time = Time.now
set_loggers
+ dalli = Dalli::Client.new(Settings.cache.memcached, namespace: "aclog-collector:")
+ dalli.alive!
+
EM.run do
sock_path = File.join(Rails.root, "tmp", "sockets", "collector.sock")
File.delete(sock_path) if File.exist?(sock_path)
@@ -21,11 +24,8 @@ module Collector
control.listen(MessagePack::RPC::UNIXServerTransport.new(sock_path), Collector::ControlServer.new)
EM.defer { control.run }
- event_queue = Collector::EventQueue.new
- EM.add_periodic_timer(Settings.collector.flush_interval) do
- event_queue.flush
- end
- NotificationQueue.start
+ event_queue = EventQueue.start(dalli)
+ NotificationQueue.start(dalli)
nodes = EM.start_server("0.0.0.0", Settings.collector.server_port, Collector::NodeConnection, event_queue)
diff --git a/collector/event_queue.rb b/collector/event_queue.rb
index 6dce058..7131b54 100644
--- a/collector/event_queue.rb
+++ b/collector/event_queue.rb
@@ -1,7 +1,7 @@
module Collector
class EventQueue
- def initialize
- @dalli = Dalli::Client.new(Settings.cache.memcached, namespace: "aclog-collector:")
+ def initialize(dalli)
+ @dalli = dalli
@queue_mutex = Mutex.new
@queue_user = Queue.new
@@ -13,6 +13,14 @@ module Collector
@queue_unauthorized = Queue.new
end
+ def self.start(dalli)
+ instance = self.new(dalli)
+ EM.add_periodic_timer(Settings.collector.flush_interval) do
+ instance.flush
+ end
+ instance
+ end
+
def flush
users = tweets = favorites = retweets = unfavorites = deletes = unauthorizeds = nil
diff --git a/collector/notification_queue.rb b/collector/notification_queue.rb
index 8665aae..7f923e4 100644
--- a/collector/notification_queue.rb
+++ b/collector/notification_queue.rb
@@ -1,7 +1,15 @@
module Collector
class NotificationQueue
- def initialize
+ def initialize(dalli)
+ @dalli = dalli
@queue = Queue.new # not EM::Queue
+ @thresholds = Settings.notification.favorites.freeze
+ @clients = Settings.notification.accounts.map { |hash|
+ Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key,
+ consumer_secret: Settings.notification.consumer.secret,
+ access_token: hash.token,
+ access_token_secret: hash.secret)
+ }.freeze
end
def run
@@ -11,8 +19,11 @@ module Collector
ids = @queue.pop
next if ids.empty?
- Tweet.where(id: ids).includes(user: :account).each do |tweet|
- perform_tweet(tweet)
+ Tweet.where(id: ids).joins(user: :account).each do |tweet|
+ acc = tweet.user&.account
+ if acc&.active? && acc&.notification_enabled?
+ perform_tweet(tweet)
+ end
end
end
end
@@ -23,49 +34,30 @@ module Collector
private
def perform_tweet(tweet)
- last_count = Rails.cache.read("notification/tweets/#{tweet.id}/favorites_count")
- Rails.cache.write("notification/tweets/#{tweet.id}/favorites_count", [last_count || 0, tweet.favorites_count].max)
+ last_count = @dalli.get("notification/tweets/#{tweet.id}/favorites_count")
+ @dalli.set("notification/tweets/#{tweet.id}/favorites_count", [last_count || 0, tweet.favorites_count].max)
- if last_count
- t_count = Settings.notification.favorites.select { |m| last_count < m && m <= tweet.favorites_count }.last
- else
- t_count = Settings.notification.favorites.include?(tweet.favorites_count) && tweet.favorites_count
- end
-
- if t_count
- notify(tweet, "#{t_count}likes!")
+ if last_count && (t_count = @thresholds.select { |m| last_count < m && m <= tweet.favorites_count }.last) ||
+ @thresholds.include?(t_count = tweet.favorites_count)
+ post("@#{tweet.user.screen_name} #{t_count}likes! #{tweet_url(tweet)}", tweet.id)
end
end
- def notify(tweet, text)
- user = tweet.user
- account = user.account
-
- if account && account.active? && account.notification_enabled?
- post("@#{user.screen_name} #{text} #{Settings.base_url}/i/#{tweet.id}", tweet.id)
- end
+ def tweet_url(tweet)
+ "#{Settings.base_url}/i/#{tweet.id}"
end
def post(text, reply_to = 0)
- Settings.notification.accounts.each do |hash|
+ @clients.each do |client|
begin
- client(hash).update(text, in_reply_to_status_id: reply_to)
+ client.update(text, in_reply_to_status_id: reply_to)
break
rescue Twitter::Error::Forbidden => e
- raise e unless e.message = "User is over daily status update limit."
+ raise e unless e.message == "User is over daily status update limit."
end
end
end
- def client(acc)
- @_client ||= {}
- @_client[acc] ||=
- Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key,
- consumer_secret: Settings.notification.consumer.secret,
- access_token: acc.token,
- access_token_secret: acc.secret)
- end
-
class << self
def push(ids)
instance.push(ids)
@@ -75,8 +67,8 @@ module Collector
@queue or raise(ArgumentError, "NotificationQueue is not initialized")
end
- def start
- @queue = NotificationQueue.new
+ def start(dalli)
+ @queue = NotificationQueue.new(dalli)
EM.defer { @queue.run }
end
end