aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-05-12 14:30:01 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-05-12 14:30:01 +0900
commite4d1686cbd1e59ab968675166ae25a7891b39f1a (patch)
tree9d21d701d77f05ba9ab6c5e097fe55043a4a2db6
parentf1051e1e7e70aa9c9864b3826060397ec508260b (diff)
downloadaclog-e4d1686cbd1e59ab968675166ae25a7891b39f1a.tar.gz
worker_node/collctor: cache with timestamp
-rw-r--r--lib/collector/event_queue.rb6
-rw-r--r--worker_node/lib/event_channel.rb8
-rw-r--r--worker_node/lib/user_connection.rb4
3 files changed, 11 insertions, 7 deletions
diff --git a/lib/collector/event_queue.rb b/lib/collector/event_queue.rb
index 3851d82..936263b 100644
--- a/lib/collector/event_queue.rb
+++ b/lib/collector/event_queue.rb
@@ -93,8 +93,10 @@ module Collector
private
def cache(object)
if id = object[:identifier]
- unless @dalli.get(id)
- @dalli.set(id, true)
+ key, val = id.split("#", 2)
+ cur = @dalli.get(id)
+ if !cur || (val && (cur <=> val) == -1) # not found or new
+ @dalli.set(key, true || value)
yield
end
else
diff --git a/worker_node/lib/event_channel.rb b/worker_node/lib/event_channel.rb
index 9574bc7..3d1c262 100644
--- a/worker_node/lib/event_channel.rb
+++ b/worker_node/lib/event_channel.rb
@@ -9,11 +9,13 @@ class EventChannel
def push(data)
raise ScriptError, "Call EventChannel.setup first" unless @dalli
if id = data[:identifier]
- if @dalli.get(id)
- WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{id}" }
+ key, val = id.split("#", 2)
+ cur = @dalli.get(key)
+ if cur && (!val || (cur <=> val) > -1)
+ WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{key}" }
return
else
- @dalli.set(id, true)
+ @dalli.set(key, val || true)
end
end
@channel << data
diff --git a/worker_node/lib/user_connection.rb b/worker_node/lib/user_connection.rb
index 3d8be15..196f750 100644
--- a/worker_node/lib/user_connection.rb
+++ b/worker_node/lib/user_connection.rb
@@ -94,7 +94,7 @@ class UserConnection
log(:debug, "Tweet: #{json[:user][:id]} => #{json[:id]}")
on_user(json[:user])
EventChannel << { event: :tweet,
- identifier: "tweet-#{json[:id]}-#{json[:favorite_count]}-#{json[:retweet_count]}",
+ identifier: "tweet-#{json[:id]}##{json[:timestamp_ms]}-#{json[:favorite_count]}-#{json[:retweet_count]}",
data: compact_tweet(json) }
end
@@ -116,7 +116,7 @@ class UserConnection
on_user(json[:target])
on_tweet(json[:target_object])
EventChannel << { event: json[:event].to_sym,
- identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target][:id]}-#{json[:target_object][:id]}",
+ identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target_object][:id]}",
data: { timestamp_ms: json[:timestamp_ms],
source: { id: json[:source][:id] },
target: { id: json[:target][:id] },