diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-05-12 14:30:01 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-05-12 14:30:01 +0900 |
commit | e4d1686cbd1e59ab968675166ae25a7891b39f1a (patch) | |
tree | 9d21d701d77f05ba9ab6c5e097fe55043a4a2db6 | |
parent | f1051e1e7e70aa9c9864b3826060397ec508260b (diff) | |
download | aclog-e4d1686cbd1e59ab968675166ae25a7891b39f1a.tar.gz |
worker_node/collctor: cache with timestamp
-rw-r--r-- | lib/collector/event_queue.rb | 6 | ||||
-rw-r--r-- | worker_node/lib/event_channel.rb | 8 | ||||
-rw-r--r-- | worker_node/lib/user_connection.rb | 4 |
3 files changed, 11 insertions, 7 deletions
diff --git a/lib/collector/event_queue.rb b/lib/collector/event_queue.rb index 3851d82..936263b 100644 --- a/lib/collector/event_queue.rb +++ b/lib/collector/event_queue.rb @@ -93,8 +93,10 @@ module Collector private def cache(object) if id = object[:identifier] - unless @dalli.get(id) - @dalli.set(id, true) + key, val = id.split("#", 2) + cur = @dalli.get(id) + if !cur || (val && (cur <=> val) == -1) # not found or new + @dalli.set(key, true || value) yield end else diff --git a/worker_node/lib/event_channel.rb b/worker_node/lib/event_channel.rb index 9574bc7..3d1c262 100644 --- a/worker_node/lib/event_channel.rb +++ b/worker_node/lib/event_channel.rb @@ -9,11 +9,13 @@ class EventChannel def push(data) raise ScriptError, "Call EventChannel.setup first" unless @dalli if id = data[:identifier] - if @dalli.get(id) - WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{id}" } + key, val = id.split("#", 2) + cur = @dalli.get(key) + if cur && (!val || (cur <=> val) > -1) + WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{key}" } return else - @dalli.set(id, true) + @dalli.set(key, val || true) end end @channel << data diff --git a/worker_node/lib/user_connection.rb b/worker_node/lib/user_connection.rb index 3d8be15..196f750 100644 --- a/worker_node/lib/user_connection.rb +++ b/worker_node/lib/user_connection.rb @@ -94,7 +94,7 @@ class UserConnection log(:debug, "Tweet: #{json[:user][:id]} => #{json[:id]}") on_user(json[:user]) EventChannel << { event: :tweet, - identifier: "tweet-#{json[:id]}-#{json[:favorite_count]}-#{json[:retweet_count]}", + identifier: "tweet-#{json[:id]}##{json[:timestamp_ms]}-#{json[:favorite_count]}-#{json[:retweet_count]}", data: compact_tweet(json) } end @@ -116,7 +116,7 @@ class UserConnection on_user(json[:target]) on_tweet(json[:target_object]) EventChannel << { event: json[:event].to_sym, - identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target][:id]}-#{json[:target_object][:id]}", + identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target_object][:id]}", data: { timestamp_ms: json[:timestamp_ms], source: { id: json[:source][:id] }, target: { id: json[:target][:id] }, |