diff options
author | rhenium <rhenium@rhe.jp> | 2015-04-06 15:38:18 +0900 |
---|---|---|
committer | rhenium <rhenium@rhe.jp> | 2015-04-06 15:38:18 +0900 |
commit | 8e8b16724e9c450b89e91cc13d563b0d79b20e76 (patch) | |
tree | dfd61c373f8783fa554c7e7766fd5fbdcf07c224 | |
parent | 450be06c45a5d3eb02ed1df742d4dc3fb5fcaabf (diff) | |
download | aclog-8e8b16724e9c450b89e91cc13d563b0d79b20e76.tar.gz |
collector: use memcached as cache
-rw-r--r-- | lib/collector/event_queue.rb | 86 |
1 files changed, 48 insertions, 38 deletions
diff --git a/lib/collector/event_queue.rb b/lib/collector/event_queue.rb index bc61373..766094e 100644 --- a/lib/collector/event_queue.rb +++ b/lib/collector/event_queue.rb @@ -1,58 +1,68 @@ module Collector class EventQueue def initialize - @queue_user = {} - @queue_tweet = {} - @queue_favorite = [] - @queue_retweet = [] - @queue_unfavorite = [] - @queue_delete = [] - @queue_unauthorized = [] + @dalli = Dalli::Client.new(Settings.cache.memcached, namespace: "aclog-collector:") + @queue_mutex = Mutex.new + + @queue_user = Queue.new + @queue_tweet = Queue.new + @queue_favorite = Queue.new + @queue_retweet = Queue.new + @queue_unfavorite = Queue.new + @queue_delete = Queue.new + @queue_unauthorized = Queue.new end def flush - queue_user = @queue_user; @queue_user = {} - queue_tweet = @queue_tweet; @queue_tweet = {} - queue_favorite = @queue_favorite; @queue_favorite = [] - queue_retweet = @queue_retweet; @queue_retweet = [] - queue_unfavorite = @queue_unfavorite; @queue_unfavorite = [] - queue_delete = @queue_delete; @queue_delete = [] - queue_unauthorized = @queue_unauthorized; @queue_unauthorized = [] + users = tweets = favorites = retweets = unfavorites = deletes = unauthorizeds = nil + + @queue_mutex.synchronize do + users = @queue_user.size.times.map { @queue_user.deq } + tweets = @queue_tweet.size.times.map { @queue_tweet.deq } + favorites = @queue_favorite.size.times.map { @queue_favorite.deq } + retweets = @queue_retweet.size.times.map { @queue_retweet.deq } + unfavorites = @queue_unfavorite.size.times.map { @queue_unfavorite.deq } + deletes = @queue_delete.size.times.map { @queue_delete.deq } + unauthorizeds = @queue_unauthorized.size.times.map { @queue_unauthorized.deq } + end - User.create_or_update_bulk_from_json(queue_user.values) - Tweet.create_bulk_from_json(queue_tweet.values) - Favorite.create_bulk_from_json(queue_favorite) - Retweet.create_bulk_from_json(queue_retweet) - Favorite.delete_bulk_from_json(queue_unfavorite) + User.create_or_update_bulk_from_json(users) + Tweet.create_bulk_from_json(tweets) + Favorite.create_bulk_from_json(favorites) + Retweet.create_bulk_from_json(retweets) + Favorite.delete_bulk_from_json(unfavorites) - if queue_delete.size > 0 - Tweet.destroy_bulk_from_json(queue_delete) - Retweet.delete_bulk_from_json(queue_delete) + if deletes.size > 0 + Tweet.destroy_bulk_from_json(deletes) + Retweet.delete_bulk_from_json(deletes) end - queue_favorite.each do |event| + favorites.each do |event| Notification.try_notify_favorites(id: event[:target_object][:id], user_id: event[:target_object][:user][:id], favorites_count: event[:target_object][:favorite_count]) end - queue_unauthorized.each do |a| + unauthorizeds.each do |a| account = Account.find(a[:id]) account.verify_token! end end def push_user(user) - @queue_user[user[:id]] = user + cache(user.merge!(identifier: user[:id])) do + @queue_user << user + end end def push_tweet(tweet) - push_user(tweet[:user]) - @queue_tweet[tweet[:id]] = tweet + cache(tweet.merge!(identifier: "tweet-#{tweet[:id]}-#{tweet[:favorite_count]}-#{tweet[:retweet_count]}")) do + @queue_tweet << tweet + end end def push_favorite(event) - caching(:favorite, "#{event[:created_at]}-#{event[:source][:id]}-#{event[:target_object][:id]}") do + cache(event.merge!(identifier: "favorite-#{event[:timestamp_ms]}-#{event[:source][:id]}-#{event[:target_object][:id]}")) do push_tweet(event[:target_object]) push_user(event[:source]) @queue_favorite << event @@ -60,7 +70,7 @@ module Collector end def push_retweet(status) - caching(:retweet, status[:id]) do + cache(status.merge!(identifier: "retweet-#{status[:id]}")) do push_user(status[:user]) push_tweet(status[:retweeted_status]) @queue_retweet << status @@ -68,7 +78,7 @@ module Collector end def push_unfavorite(event) - caching(:unfavorite, "#{event[:created_at]}-#{event[:source][:id]}-#{event[:target_object][:id]}") do + cache(event.merge!(identifier: "unfavorite-#{event[:timestamp_ms]}-#{event[:source][:id]}-#{event[:target_object][:id]}")) do push_tweet(event[:target_object]) push_user(event[:source]) @queue_unfavorite << event @@ -76,7 +86,7 @@ module Collector end def push_delete(delete) - caching(:delete, delete[:delete][:status][:id]) do + cache(delete.merge(identifier: "delete-#{delete[:delete][:status][:id]}")) do @queue_delete << delete end end @@ -86,14 +96,14 @@ module Collector end private - def caching(type, unique_key) - @_cache ||= {} - store = (@_cache[type] ||= {}) - - unless store.key?(unique_key) + def cache(object) + if id = object[:identifier] + unless @dalli.get(id) + @dalli.set(id, true) + yield + end + else yield - store[unique_key] = true - store.shift if store.size > Settings.collector.cache_size end end end |