aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrhenium <rhenium@rhe.jp>2015-04-06 15:38:18 +0900
committerrhenium <rhenium@rhe.jp>2015-04-06 15:38:18 +0900
commit8e8b16724e9c450b89e91cc13d563b0d79b20e76 (patch)
treedfd61c373f8783fa554c7e7766fd5fbdcf07c224
parent450be06c45a5d3eb02ed1df742d4dc3fb5fcaabf (diff)
downloadaclog-8e8b16724e9c450b89e91cc13d563b0d79b20e76.tar.gz
collector: use memcached as cache
-rw-r--r--lib/collector/event_queue.rb86
1 files changed, 48 insertions, 38 deletions
diff --git a/lib/collector/event_queue.rb b/lib/collector/event_queue.rb
index bc61373..766094e 100644
--- a/lib/collector/event_queue.rb
+++ b/lib/collector/event_queue.rb
@@ -1,58 +1,68 @@
module Collector
class EventQueue
def initialize
- @queue_user = {}
- @queue_tweet = {}
- @queue_favorite = []
- @queue_retweet = []
- @queue_unfavorite = []
- @queue_delete = []
- @queue_unauthorized = []
+ @dalli = Dalli::Client.new(Settings.cache.memcached, namespace: "aclog-collector:")
+ @queue_mutex = Mutex.new
+
+ @queue_user = Queue.new
+ @queue_tweet = Queue.new
+ @queue_favorite = Queue.new
+ @queue_retweet = Queue.new
+ @queue_unfavorite = Queue.new
+ @queue_delete = Queue.new
+ @queue_unauthorized = Queue.new
end
def flush
- queue_user = @queue_user; @queue_user = {}
- queue_tweet = @queue_tweet; @queue_tweet = {}
- queue_favorite = @queue_favorite; @queue_favorite = []
- queue_retweet = @queue_retweet; @queue_retweet = []
- queue_unfavorite = @queue_unfavorite; @queue_unfavorite = []
- queue_delete = @queue_delete; @queue_delete = []
- queue_unauthorized = @queue_unauthorized; @queue_unauthorized = []
+ users = tweets = favorites = retweets = unfavorites = deletes = unauthorizeds = nil
+
+ @queue_mutex.synchronize do
+ users = @queue_user.size.times.map { @queue_user.deq }
+ tweets = @queue_tweet.size.times.map { @queue_tweet.deq }
+ favorites = @queue_favorite.size.times.map { @queue_favorite.deq }
+ retweets = @queue_retweet.size.times.map { @queue_retweet.deq }
+ unfavorites = @queue_unfavorite.size.times.map { @queue_unfavorite.deq }
+ deletes = @queue_delete.size.times.map { @queue_delete.deq }
+ unauthorizeds = @queue_unauthorized.size.times.map { @queue_unauthorized.deq }
+ end
- User.create_or_update_bulk_from_json(queue_user.values)
- Tweet.create_bulk_from_json(queue_tweet.values)
- Favorite.create_bulk_from_json(queue_favorite)
- Retweet.create_bulk_from_json(queue_retweet)
- Favorite.delete_bulk_from_json(queue_unfavorite)
+ User.create_or_update_bulk_from_json(users)
+ Tweet.create_bulk_from_json(tweets)
+ Favorite.create_bulk_from_json(favorites)
+ Retweet.create_bulk_from_json(retweets)
+ Favorite.delete_bulk_from_json(unfavorites)
- if queue_delete.size > 0
- Tweet.destroy_bulk_from_json(queue_delete)
- Retweet.delete_bulk_from_json(queue_delete)
+ if deletes.size > 0
+ Tweet.destroy_bulk_from_json(deletes)
+ Retweet.delete_bulk_from_json(deletes)
end
- queue_favorite.each do |event|
+ favorites.each do |event|
Notification.try_notify_favorites(id: event[:target_object][:id],
user_id: event[:target_object][:user][:id],
favorites_count: event[:target_object][:favorite_count])
end
- queue_unauthorized.each do |a|
+ unauthorizeds.each do |a|
account = Account.find(a[:id])
account.verify_token!
end
end
def push_user(user)
- @queue_user[user[:id]] = user
+ cache(user.merge!(identifier: user[:id])) do
+ @queue_user << user
+ end
end
def push_tweet(tweet)
- push_user(tweet[:user])
- @queue_tweet[tweet[:id]] = tweet
+ cache(tweet.merge!(identifier: "tweet-#{tweet[:id]}-#{tweet[:favorite_count]}-#{tweet[:retweet_count]}")) do
+ @queue_tweet << tweet
+ end
end
def push_favorite(event)
- caching(:favorite, "#{event[:created_at]}-#{event[:source][:id]}-#{event[:target_object][:id]}") do
+ cache(event.merge!(identifier: "favorite-#{event[:timestamp_ms]}-#{event[:source][:id]}-#{event[:target_object][:id]}")) do
push_tweet(event[:target_object])
push_user(event[:source])
@queue_favorite << event
@@ -60,7 +70,7 @@ module Collector
end
def push_retweet(status)
- caching(:retweet, status[:id]) do
+ cache(status.merge!(identifier: "retweet-#{status[:id]}")) do
push_user(status[:user])
push_tweet(status[:retweeted_status])
@queue_retweet << status
@@ -68,7 +78,7 @@ module Collector
end
def push_unfavorite(event)
- caching(:unfavorite, "#{event[:created_at]}-#{event[:source][:id]}-#{event[:target_object][:id]}") do
+ cache(event.merge!(identifier: "unfavorite-#{event[:timestamp_ms]}-#{event[:source][:id]}-#{event[:target_object][:id]}")) do
push_tweet(event[:target_object])
push_user(event[:source])
@queue_unfavorite << event
@@ -76,7 +86,7 @@ module Collector
end
def push_delete(delete)
- caching(:delete, delete[:delete][:status][:id]) do
+ cache(delete.merge(identifier: "delete-#{delete[:delete][:status][:id]}")) do
@queue_delete << delete
end
end
@@ -86,14 +96,14 @@ module Collector
end
private
- def caching(type, unique_key)
- @_cache ||= {}
- store = (@_cache[type] ||= {})
-
- unless store.key?(unique_key)
+ def cache(object)
+ if id = object[:identifier]
+ unless @dalli.get(id)
+ @dalli.set(id, true)
+ yield
+ end
+ else
yield
- store[unique_key] = true
- store.shift if store.size > Settings.collector.cache_size
end
end
end