aboutsummaryrefslogtreecommitdiffstats
path: root/worker_node
diff options
context:
space:
mode:
Diffstat (limited to 'worker_node')
-rw-r--r--worker_node/lib/event_channel.rb10
-rw-r--r--worker_node/lib/user_connection.rb11
-rw-r--r--worker_node/lib/user_stream/client.rb10
3 files changed, 19 insertions, 12 deletions
diff --git a/worker_node/lib/event_channel.rb b/worker_node/lib/event_channel.rb
index 9574bc7..c1c0d9c 100644
--- a/worker_node/lib/event_channel.rb
+++ b/worker_node/lib/event_channel.rb
@@ -2,18 +2,20 @@ class EventChannel
class << self
def setup
return if @dalli
- @dalli = Dalli::Client.new(Settings.memcached, namespace: "aclog-worker-node:")
+ @dalli = Dalli::Client.new(Settings.memcached, namespace: "aclog-worker-node")
@channel = EM::Channel.new
end
def push(data)
raise ScriptError, "Call EventChannel.setup first" unless @dalli
if id = data[:identifier]
- if @dalli.get(id)
- WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{id}" }
+ key, val = id.split("#", 2)
+ cur = @dalli.get(key)
+ if cur && (!val || (cur <=> val) > -1)
+ WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{key}" }
return
else
- @dalli.set(id, true)
+ @dalli.set(key, val || true)
end
end
@channel << data
diff --git a/worker_node/lib/user_connection.rb b/worker_node/lib/user_connection.rb
index 3d8be15..1842ff8 100644
--- a/worker_node/lib/user_connection.rb
+++ b/worker_node/lib/user_connection.rb
@@ -21,7 +21,7 @@ class UserConnection
end
def stop
- @client.close
+ @client.stop
log(:info, "Stopped: #{@account_id}")
end
@@ -35,7 +35,7 @@ class UserConnection
log(:warn, "Connection reset")
EM.add_timer(5) { @client.reconnect }
else
- log(:error, "Unknown error: #{error.inspect}")
+ log(:error, "Unknown error: #{error}")
end
end
@client.on_service_unavailable do |message|
@@ -54,7 +54,8 @@ class UserConnection
log(:warn, "420: #{message}")
end
@client.on_disconnected do
- @client.reconnect
+ log(:warn, "Disconnected")
+ EM.add_timer(5) { @client.reconnect }
end
@client.on_item do |item|
@@ -94,7 +95,7 @@ class UserConnection
log(:debug, "Tweet: #{json[:user][:id]} => #{json[:id]}")
on_user(json[:user])
EventChannel << { event: :tweet,
- identifier: "tweet-#{json[:id]}-#{json[:favorite_count]}-#{json[:retweet_count]}",
+ identifier: "tweet-#{json[:id]}##{json[:timestamp_ms]}-#{json[:favorite_count]}-#{json[:retweet_count]}",
data: compact_tweet(json) }
end
@@ -116,7 +117,7 @@ class UserConnection
on_user(json[:target])
on_tweet(json[:target_object])
EventChannel << { event: json[:event].to_sym,
- identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target][:id]}-#{json[:target_object][:id]}",
+ identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target_object][:id]}",
data: { timestamp_ms: json[:timestamp_ms],
source: { id: json[:source][:id] },
target: { id: json[:target][:id] },
diff --git a/worker_node/lib/user_stream/client.rb b/worker_node/lib/user_stream/client.rb
index 1f83914..4044292 100644
--- a/worker_node/lib/user_stream/client.rb
+++ b/worker_node/lib/user_stream/client.rb
@@ -7,7 +7,7 @@ module UserStream
def initialize(options = {})
@options = { compression: true }.merge(options).freeze
@callbacks = {}
- @closing = false
+ @exiting = false
end
def update(options = {})
@@ -20,8 +20,12 @@ module UserStream
connect
end
+ def stop
+ @exiting = true
+ close
+ end
+
def close
- @closing = true
@http.close
end
@@ -64,7 +68,7 @@ module UserStream
end
http.errback do
- callback(:error, http.error) unless @closing
+ callback(:error, http.error) unless @exiting
end
@http = http