From e4d1686cbd1e59ab968675166ae25a7891b39f1a Mon Sep 17 00:00:00 2001 From: Kazuki Yamaguchi Date: Tue, 12 May 2015 14:30:01 +0900 Subject: worker_node/collctor: cache with timestamp --- lib/collector/event_queue.rb | 6 ++++-- worker_node/lib/event_channel.rb | 8 +++++--- worker_node/lib/user_connection.rb | 4 ++-- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/collector/event_queue.rb b/lib/collector/event_queue.rb index 3851d82..936263b 100644 --- a/lib/collector/event_queue.rb +++ b/lib/collector/event_queue.rb @@ -93,8 +93,10 @@ module Collector private def cache(object) if id = object[:identifier] - unless @dalli.get(id) - @dalli.set(id, true) + key, val = id.split("#", 2) + cur = @dalli.get(id) + if !cur || (val && (cur <=> val) == -1) # not found or new + @dalli.set(key, true || value) yield end else diff --git a/worker_node/lib/event_channel.rb b/worker_node/lib/event_channel.rb index 9574bc7..3d1c262 100644 --- a/worker_node/lib/event_channel.rb +++ b/worker_node/lib/event_channel.rb @@ -9,11 +9,13 @@ class EventChannel def push(data) raise ScriptError, "Call EventChannel.setup first" unless @dalli if id = data[:identifier] - if @dalli.get(id) - WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{id}" } + key, val = id.split("#", 2) + cur = @dalli.get(key) + if cur && (!val || (cur <=> val) > -1) + WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{key}" } return else - @dalli.set(id, true) + @dalli.set(key, val || true) end end @channel << data diff --git a/worker_node/lib/user_connection.rb b/worker_node/lib/user_connection.rb index 3d8be15..196f750 100644 --- a/worker_node/lib/user_connection.rb +++ b/worker_node/lib/user_connection.rb @@ -94,7 +94,7 @@ class UserConnection log(:debug, "Tweet: #{json[:user][:id]} => #{json[:id]}") on_user(json[:user]) EventChannel << { event: :tweet, - identifier: "tweet-#{json[:id]}-#{json[:favorite_count]}-#{json[:retweet_count]}", + identifier: "tweet-#{json[:id]}##{json[:timestamp_ms]}-#{json[:favorite_count]}-#{json[:retweet_count]}", data: compact_tweet(json) } end @@ -116,7 +116,7 @@ class UserConnection on_user(json[:target]) on_tweet(json[:target_object]) EventChannel << { event: json[:event].to_sym, - identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target][:id]}-#{json[:target_object][:id]}", + identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target_object][:id]}", data: { timestamp_ms: json[:timestamp_ms], source: { id: json[:source][:id] }, target: { id: json[:target][:id] }, -- cgit v1.2.3