aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKazuki Yamaguchi <k@rhe.jp>2015-05-15 15:05:47 +0900
committerKazuki Yamaguchi <k@rhe.jp>2015-05-15 15:05:47 +0900
commit21147abf0527944efc23c38668772de6d794fbd7 (patch)
treedd6ee18b76b9db3dbcefda71fbfa06fb3b6ee1fd
parenta9097ace972452c52a1989af74431a289f308d48 (diff)
parent3b28d3a42cbbffa199cdff03dcccc698cb6083f7 (diff)
downloadaclog-21147abf0527944efc23c38668772de6d794fbd7.tar.gz
Merge branch 'master' of gitlab.rhe.jp:rhenium/aclog
-rw-r--r--README.md2
-rw-r--r--config/environments/development.rb2
-rw-r--r--config/environments/production.rb2
-rw-r--r--lib/collector/event_queue.rb6
-rw-r--r--worker_node/lib/event_channel.rb10
-rw-r--r--worker_node/lib/user_connection.rb4
6 files changed, 15 insertions, 11 deletions
diff --git a/README.md b/README.md
index e7195ce..5534af6 100644
--- a/README.md
+++ b/README.md
@@ -29,7 +29,7 @@ Collects favs and retweets in real time by UserStreams.
* Atom feed
## Requirements
-* Ruby 2.1+
+* Ruby 2.2+
* MySQL/MariaDB 5.5.14+ (needs utf8mb4 support)
* memcached
* JavaScript runtime (see https://github.com/rails/execjs)
diff --git a/config/environments/development.rb b/config/environments/development.rb
index 620acb1..6e2c297 100644
--- a/config/environments/development.rb
+++ b/config/environments/development.rb
@@ -39,5 +39,5 @@ Rails.application.configure do
# Raises error for missing translations
# config.action_view.raise_on_missing_translations = true
- config.cache_store = :dalli_store, Settings.cache.memcached, { namespace: "aclog-web:", pool_size: 5, expires_in: Settings.cache.expires_in }
+ config.cache_store = :dalli_store, Settings.cache.memcached, { namespace: "aclog-web", pool_size: 5, expires_in: Settings.cache.expires_in }
end
diff --git a/config/environments/production.rb b/config/environments/production.rb
index 72c59f1..cfcf62d 100644
--- a/config/environments/production.rb
+++ b/config/environments/production.rb
@@ -55,7 +55,7 @@ Rails.application.configure do
# config.logger = ActiveSupport::TaggedLogging.new(SyslogLogger.new)
# Use a different cache store in production.
- config.cache_store = :dalli_store, Settings.cache.memcached, { namespace: "aclog-web:", pool_size: 5, expires_in: Settings.cache.expires_in }
+ config.cache_store = :dalli_store, Settings.cache.memcached, { namespace: "aclog-web", pool_size: 5, expires_in: Settings.cache.expires_in }
# Enable serving of images, stylesheets, and JavaScripts from an asset server.
# config.action_controller.asset_host = 'http://assets.example.com'
diff --git a/lib/collector/event_queue.rb b/lib/collector/event_queue.rb
index 3851d82..936263b 100644
--- a/lib/collector/event_queue.rb
+++ b/lib/collector/event_queue.rb
@@ -93,8 +93,10 @@ module Collector
private
def cache(object)
if id = object[:identifier]
- unless @dalli.get(id)
- @dalli.set(id, true)
+ key, val = id.split("#", 2)
+ cur = @dalli.get(id)
+ if !cur || (val && (cur <=> val) == -1) # not found or new
+ @dalli.set(key, true || value)
yield
end
else
diff --git a/worker_node/lib/event_channel.rb b/worker_node/lib/event_channel.rb
index 9574bc7..c1c0d9c 100644
--- a/worker_node/lib/event_channel.rb
+++ b/worker_node/lib/event_channel.rb
@@ -2,18 +2,20 @@ class EventChannel
class << self
def setup
return if @dalli
- @dalli = Dalli::Client.new(Settings.memcached, namespace: "aclog-worker-node:")
+ @dalli = Dalli::Client.new(Settings.memcached, namespace: "aclog-worker-node")
@channel = EM::Channel.new
end
def push(data)
raise ScriptError, "Call EventChannel.setup first" unless @dalli
if id = data[:identifier]
- if @dalli.get(id)
- WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{id}" }
+ key, val = id.split("#", 2)
+ cur = @dalli.get(key)
+ if cur && (!val || (cur <=> val) > -1)
+ WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{key}" }
return
else
- @dalli.set(id, true)
+ @dalli.set(key, val || true)
end
end
@channel << data
diff --git a/worker_node/lib/user_connection.rb b/worker_node/lib/user_connection.rb
index d8a75ad..1842ff8 100644
--- a/worker_node/lib/user_connection.rb
+++ b/worker_node/lib/user_connection.rb
@@ -95,7 +95,7 @@ class UserConnection
log(:debug, "Tweet: #{json[:user][:id]} => #{json[:id]}")
on_user(json[:user])
EventChannel << { event: :tweet,
- identifier: "tweet-#{json[:id]}-#{json[:favorite_count]}-#{json[:retweet_count]}",
+ identifier: "tweet-#{json[:id]}##{json[:timestamp_ms]}-#{json[:favorite_count]}-#{json[:retweet_count]}",
data: compact_tweet(json) }
end
@@ -117,7 +117,7 @@ class UserConnection
on_user(json[:target])
on_tweet(json[:target_object])
EventChannel << { event: json[:event].to_sym,
- identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target][:id]}-#{json[:target_object][:id]}",
+ identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target_object][:id]}",
data: { timestamp_ms: json[:timestamp_ms],
source: { id: json[:source][:id] },
target: { id: json[:target][:id] },