aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrhenium <rhenium@rhe.jp>2014-09-14 07:58:27 +0900
committerrhenium <rhenium@rhe.jp>2014-09-14 07:58:27 +0900
commitf6eb29604f6d3728834fe427e14c37c9c4526004 (patch)
tree3fddb75f86c099a057bc308945e76076a5bc1430
parent4edd466b1a79dc96ae8abb57dcd1805bd983f38d (diff)
downloadaclog-f6eb29604f6d3728834fe427e14c37c9c4526004.tar.gz
worker_node: refactor event queue
-rw-r--r--worker_node/lib/worker_node.rb2
-rw-r--r--worker_node/lib/worker_node/collector_connection.rb8
-rw-r--r--worker_node/lib/worker_node/event_queue.rb25
-rw-r--r--worker_node/lib/worker_node/unique_channel.rb27
-rw-r--r--worker_node/lib/worker_node/user_stream.rb16
5 files changed, 39 insertions, 39 deletions
diff --git a/worker_node/lib/worker_node.rb b/worker_node/lib/worker_node.rb
index fdd7579..fb3f2fa 100644
--- a/worker_node/lib/worker_node.rb
+++ b/worker_node/lib/worker_node.rb
@@ -1,5 +1,5 @@
require "yaml"
-require "worker_node/event_queue"
+require "worker_node/unique_channel"
require "worker_node/worker"
require "worker_node/collector_connection"
require "worker_node/user_stream"
diff --git a/worker_node/lib/worker_node/collector_connection.rb b/worker_node/lib/worker_node/collector_connection.rb
index bdb142e..dca9dfd 100644
--- a/worker_node/lib/worker_node/collector_connection.rb
+++ b/worker_node/lib/worker_node/collector_connection.rb
@@ -6,14 +6,12 @@ module WorkerNode
def initialize
@streams = {}
@unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
- @event_queue = EventQueue.new
+ @event_channel = UniqueChannel.new {|item| item[:unique_id] }
@exiting = false
- blk = ->(event) do
+ @event_channel.subscribe do |event|
send_message(event[0], event[1])
- @event_queue.pop &blk
end
- @event_queue.pop &blk
end
def post_init
@@ -74,7 +72,7 @@ module WorkerNode
@streams[account_id].update(msg)
log(:info, "Updated account: #{account_id}")
else
- stream = UserStream.new(msg, @event_queue)
+ stream = UserStream.new(msg, @event_channel)
stream.start
@streams[account_id] = stream
log(:info, "Registered account: #{account_id}")
diff --git a/worker_node/lib/worker_node/event_queue.rb b/worker_node/lib/worker_node/event_queue.rb
deleted file mode 100644
index 6311a25..0000000
--- a/worker_node/lib/worker_node/event_queue.rb
+++ /dev/null
@@ -1,25 +0,0 @@
-module WorkerNode
- class EventQueue
- def initialize
- @cache = {}
- @queue = EM::Queue.new
- end
-
- def push(type, event)
- if event[:unique_id] && @cache.key?(event[:unique_id])
- WorkerNode.logger.debug("[EventQueue] Duplicate event: #{event[:unique_id]}")
- else
- @queue << [type, event]
- if event[:unique_id]
- @cache[event[:unique_id]] = true
- @cache.shift if @cache.size > Settings.cache_size
- # Hash#shift seems to delete the first item (CRuby 2.0.0-2.1.2) (ref: hash.c: rb_hash_shift)
- end
- end
- end
-
- def pop(&blk)
- @queue.pop &blk
- end
- end
-end
diff --git a/worker_node/lib/worker_node/unique_channel.rb b/worker_node/lib/worker_node/unique_channel.rb
new file mode 100644
index 0000000..fbefbdd
--- /dev/null
+++ b/worker_node/lib/worker_node/unique_channel.rb
@@ -0,0 +1,27 @@
+module WorkerNode
+ class UniqueChannel
+ def initialize(&blk)
+ @cache = {}
+ @channel = EM::Channel.new
+ @block = blk
+ end
+
+ def push(type, event)
+ u = @block.call(event)
+ if @cache.key?(u)
+ WorkerNode.logger.debug("[UniqueChannel] Duplicate event: #{u}")
+ else
+ @channel.push [type, event]
+ if u
+ @cache[u] = true
+ @cache.shift if @cache.size > Settings.cache_size
+ # Hash#shift seems to delete the first item (CRuby 2.0.0-2.1.2) (ref: hash.c: rb_hash_shift)
+ end
+ end
+ end
+
+ def subscribe(&blk)
+ @channel.subscribe &blk
+ end
+ end
+end
diff --git a/worker_node/lib/worker_node/user_stream.rb b/worker_node/lib/worker_node/user_stream.rb
index fd8a095..d2fabcd 100644
--- a/worker_node/lib/worker_node/user_stream.rb
+++ b/worker_node/lib/worker_node/user_stream.rb
@@ -3,10 +3,10 @@ require "yajl"
module WorkerNode
class UserStream
- def initialize(msg, queue)
+ def initialize(msg, channel)
@user_id = msg[:user_id]
@account_id = msg[:id]
- @queue = queue
+ @channel = channel
prepare_client(msg)
end
@@ -49,7 +49,7 @@ module WorkerNode
client.on_unauthorized do
log(:warn, "Unauthorized")
- @queue.push(:unauthorized, id: @account_id, user_id: @user_id)
+ @channel.push(:unauthorized, id: @account_id, user_id: @user_id)
self.stop
end
@@ -90,14 +90,14 @@ module WorkerNode
def on_tweet(json)
log(:debug, "Tweet: #{json[:user][:id]} => #{json[:id]}")
- @queue.push(:tweet,
+ @channel.push(:tweet,
reduce_tweet(json).merge(
unique_id: json[:id]))
end
def on_retweet(json)
log(:debug, "Retweet: #{json[:user][:id]} => #{json[:retweeted_status][:id]}")
- @queue.push(:retweet,
+ @channel.push(:retweet,
id: json[:id],
user: reduce_user(json[:user]),
retweeted_status: reduce_tweet(json[:retweeted_status]),
@@ -106,7 +106,7 @@ module WorkerNode
def on_favorite(json)
log(:debug, "Favorite: #{json[:source][:id]} => #{json[:target_object][:id]}")
- @queue.push(:favorite,
+ @channel.push(:favorite,
source: reduce_user(json[:source]),
target_object: reduce_tweet(json[:target_object]),
unique_id: "fav-#{json[:created_at]}-#{json[:source][:id]}-#{json[:target_object][:id]}")
@@ -114,7 +114,7 @@ module WorkerNode
def on_unfavorite(json)
log(:debug, "Unfavorite: #{json[:source][:id]} => #{json[:target_object][:id]}")
- @queue.push(:unfavorite,
+ @channel.push(:unfavorite,
source: reduce_user(json[:source]),
target_object: reduce_tweet(json[:target_object]),
unique_id: "unfav-#{json[:created_at]}-#{json[:source][:id]}-#{json[:target_object][:id]}")
@@ -122,7 +122,7 @@ module WorkerNode
def on_delete(json)
log(:debug, "Delete: #{json[:delete][:status]}")
- @queue.push(:delete,
+ @channel.push(:delete,
json.merge(
unique_id: "delete-#{json[:delete][:status][:id]}"))
end