diff options
author | Kazuki Yamaguchi <k@rhe.jp> | 2015-05-15 15:05:47 +0900 |
---|---|---|
committer | Kazuki Yamaguchi <k@rhe.jp> | 2015-05-15 15:05:47 +0900 |
commit | 21147abf0527944efc23c38668772de6d794fbd7 (patch) | |
tree | dd6ee18b76b9db3dbcefda71fbfa06fb3b6ee1fd | |
parent | a9097ace972452c52a1989af74431a289f308d48 (diff) | |
parent | 3b28d3a42cbbffa199cdff03dcccc698cb6083f7 (diff) | |
download | aclog-21147abf0527944efc23c38668772de6d794fbd7.tar.gz |
Merge branch 'master' of gitlab.rhe.jp:rhenium/aclog
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | config/environments/development.rb | 2 | ||||
-rw-r--r-- | config/environments/production.rb | 2 | ||||
-rw-r--r-- | lib/collector/event_queue.rb | 6 | ||||
-rw-r--r-- | worker_node/lib/event_channel.rb | 10 | ||||
-rw-r--r-- | worker_node/lib/user_connection.rb | 4 |
6 files changed, 15 insertions, 11 deletions
@@ -29,7 +29,7 @@ Collects favs and retweets in real time by UserStreams. * Atom feed ## Requirements -* Ruby 2.1+ +* Ruby 2.2+ * MySQL/MariaDB 5.5.14+ (needs utf8mb4 support) * memcached * JavaScript runtime (see https://github.com/rails/execjs) diff --git a/config/environments/development.rb b/config/environments/development.rb index 620acb1..6e2c297 100644 --- a/config/environments/development.rb +++ b/config/environments/development.rb @@ -39,5 +39,5 @@ Rails.application.configure do # Raises error for missing translations # config.action_view.raise_on_missing_translations = true - config.cache_store = :dalli_store, Settings.cache.memcached, { namespace: "aclog-web:", pool_size: 5, expires_in: Settings.cache.expires_in } + config.cache_store = :dalli_store, Settings.cache.memcached, { namespace: "aclog-web", pool_size: 5, expires_in: Settings.cache.expires_in } end diff --git a/config/environments/production.rb b/config/environments/production.rb index 72c59f1..cfcf62d 100644 --- a/config/environments/production.rb +++ b/config/environments/production.rb @@ -55,7 +55,7 @@ Rails.application.configure do # config.logger = ActiveSupport::TaggedLogging.new(SyslogLogger.new) # Use a different cache store in production. - config.cache_store = :dalli_store, Settings.cache.memcached, { namespace: "aclog-web:", pool_size: 5, expires_in: Settings.cache.expires_in } + config.cache_store = :dalli_store, Settings.cache.memcached, { namespace: "aclog-web", pool_size: 5, expires_in: Settings.cache.expires_in } # Enable serving of images, stylesheets, and JavaScripts from an asset server. # config.action_controller.asset_host = 'http://assets.example.com' diff --git a/lib/collector/event_queue.rb b/lib/collector/event_queue.rb index 3851d82..936263b 100644 --- a/lib/collector/event_queue.rb +++ b/lib/collector/event_queue.rb @@ -93,8 +93,10 @@ module Collector private def cache(object) if id = object[:identifier] - unless @dalli.get(id) - @dalli.set(id, true) + key, val = id.split("#", 2) + cur = @dalli.get(id) + if !cur || (val && (cur <=> val) == -1) # not found or new + @dalli.set(key, true || value) yield end else diff --git a/worker_node/lib/event_channel.rb b/worker_node/lib/event_channel.rb index 9574bc7..c1c0d9c 100644 --- a/worker_node/lib/event_channel.rb +++ b/worker_node/lib/event_channel.rb @@ -2,18 +2,20 @@ class EventChannel class << self def setup return if @dalli - @dalli = Dalli::Client.new(Settings.memcached, namespace: "aclog-worker-node:") + @dalli = Dalli::Client.new(Settings.memcached, namespace: "aclog-worker-node") @channel = EM::Channel.new end def push(data) raise ScriptError, "Call EventChannel.setup first" unless @dalli if id = data[:identifier] - if @dalli.get(id) - WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{id}" } + key, val = id.split("#", 2) + cur = @dalli.get(key) + if cur && (!val || (cur <=> val) > -1) + WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{key}" } return else - @dalli.set(id, true) + @dalli.set(key, val || true) end end @channel << data diff --git a/worker_node/lib/user_connection.rb b/worker_node/lib/user_connection.rb index d8a75ad..1842ff8 100644 --- a/worker_node/lib/user_connection.rb +++ b/worker_node/lib/user_connection.rb @@ -95,7 +95,7 @@ class UserConnection log(:debug, "Tweet: #{json[:user][:id]} => #{json[:id]}") on_user(json[:user]) EventChannel << { event: :tweet, - identifier: "tweet-#{json[:id]}-#{json[:favorite_count]}-#{json[:retweet_count]}", + identifier: "tweet-#{json[:id]}##{json[:timestamp_ms]}-#{json[:favorite_count]}-#{json[:retweet_count]}", data: compact_tweet(json) } end @@ -117,7 +117,7 @@ class UserConnection on_user(json[:target]) on_tweet(json[:target_object]) EventChannel << { event: json[:event].to_sym, - identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target][:id]}-#{json[:target_object][:id]}", + identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target_object][:id]}", data: { timestamp_ms: json[:timestamp_ms], source: { id: json[:source][:id] }, target: { id: json[:target][:id] }, |