diff options
Diffstat (limited to 'worker_node')
-rw-r--r-- | worker_node/lib/event_channel.rb | 10 | ||||
-rw-r--r-- | worker_node/lib/user_connection.rb | 11 | ||||
-rw-r--r-- | worker_node/lib/user_stream/client.rb | 10 |
3 files changed, 19 insertions, 12 deletions
diff --git a/worker_node/lib/event_channel.rb b/worker_node/lib/event_channel.rb index 9574bc7..c1c0d9c 100644 --- a/worker_node/lib/event_channel.rb +++ b/worker_node/lib/event_channel.rb @@ -2,18 +2,20 @@ class EventChannel class << self def setup return if @dalli - @dalli = Dalli::Client.new(Settings.memcached, namespace: "aclog-worker-node:") + @dalli = Dalli::Client.new(Settings.memcached, namespace: "aclog-worker-node") @channel = EM::Channel.new end def push(data) raise ScriptError, "Call EventChannel.setup first" unless @dalli if id = data[:identifier] - if @dalli.get(id) - WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{id}" } + key, val = id.split("#", 2) + cur = @dalli.get(key) + if cur && (!val || (cur <=> val) > -1) + WorkerNode.logger.debug("UniqueChannel") { "Duplicate event: #{key}" } return else - @dalli.set(id, true) + @dalli.set(key, val || true) end end @channel << data diff --git a/worker_node/lib/user_connection.rb b/worker_node/lib/user_connection.rb index 3d8be15..1842ff8 100644 --- a/worker_node/lib/user_connection.rb +++ b/worker_node/lib/user_connection.rb @@ -21,7 +21,7 @@ class UserConnection end def stop - @client.close + @client.stop log(:info, "Stopped: #{@account_id}") end @@ -35,7 +35,7 @@ class UserConnection log(:warn, "Connection reset") EM.add_timer(5) { @client.reconnect } else - log(:error, "Unknown error: #{error.inspect}") + log(:error, "Unknown error: #{error}") end end @client.on_service_unavailable do |message| @@ -54,7 +54,8 @@ class UserConnection log(:warn, "420: #{message}") end @client.on_disconnected do - @client.reconnect + log(:warn, "Disconnected") + EM.add_timer(5) { @client.reconnect } end @client.on_item do |item| @@ -94,7 +95,7 @@ class UserConnection log(:debug, "Tweet: #{json[:user][:id]} => #{json[:id]}") on_user(json[:user]) EventChannel << { event: :tweet, - identifier: "tweet-#{json[:id]}-#{json[:favorite_count]}-#{json[:retweet_count]}", + identifier: "tweet-#{json[:id]}##{json[:timestamp_ms]}-#{json[:favorite_count]}-#{json[:retweet_count]}", data: compact_tweet(json) } end @@ -116,7 +117,7 @@ class UserConnection on_user(json[:target]) on_tweet(json[:target_object]) EventChannel << { event: json[:event].to_sym, - identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target][:id]}-#{json[:target_object][:id]}", + identifier: "#{json[:event]}-#{json[:timestamp_ms]}-#{json[:source][:id]}-#{json[:target_object][:id]}", data: { timestamp_ms: json[:timestamp_ms], source: { id: json[:source][:id] }, target: { id: json[:target][:id] }, diff --git a/worker_node/lib/user_stream/client.rb b/worker_node/lib/user_stream/client.rb index 1f83914..4044292 100644 --- a/worker_node/lib/user_stream/client.rb +++ b/worker_node/lib/user_stream/client.rb @@ -7,7 +7,7 @@ module UserStream def initialize(options = {}) @options = { compression: true }.merge(options).freeze @callbacks = {} - @closing = false + @exiting = false end def update(options = {}) @@ -20,8 +20,12 @@ module UserStream connect end + def stop + @exiting = true + close + end + def close - @closing = true @http.close end @@ -64,7 +68,7 @@ module UserStream end http.errback do - callback(:error, http.error) unless @closing + callback(:error, http.error) unless @exiting end @http = http |