diff options
author | rhenium <rhenium@rhe.jp> | 2014-09-14 07:58:27 +0900 |
---|---|---|
committer | rhenium <rhenium@rhe.jp> | 2014-09-14 07:58:27 +0900 |
commit | f6eb29604f6d3728834fe427e14c37c9c4526004 (patch) | |
tree | 3fddb75f86c099a057bc308945e76076a5bc1430 | |
parent | 4edd466b1a79dc96ae8abb57dcd1805bd983f38d (diff) | |
download | aclog-f6eb29604f6d3728834fe427e14c37c9c4526004.tar.gz |
worker_node: refactor event queue
-rw-r--r-- | worker_node/lib/worker_node.rb | 2 | ||||
-rw-r--r-- | worker_node/lib/worker_node/collector_connection.rb | 8 | ||||
-rw-r--r-- | worker_node/lib/worker_node/event_queue.rb | 25 | ||||
-rw-r--r-- | worker_node/lib/worker_node/unique_channel.rb | 27 | ||||
-rw-r--r-- | worker_node/lib/worker_node/user_stream.rb | 16 |
5 files changed, 39 insertions, 39 deletions
diff --git a/worker_node/lib/worker_node.rb b/worker_node/lib/worker_node.rb index fdd7579..fb3f2fa 100644 --- a/worker_node/lib/worker_node.rb +++ b/worker_node/lib/worker_node.rb @@ -1,5 +1,5 @@ require "yaml" -require "worker_node/event_queue" +require "worker_node/unique_channel" require "worker_node/worker" require "worker_node/collector_connection" require "worker_node/user_stream" diff --git a/worker_node/lib/worker_node/collector_connection.rb b/worker_node/lib/worker_node/collector_connection.rb index bdb142e..dca9dfd 100644 --- a/worker_node/lib/worker_node/collector_connection.rb +++ b/worker_node/lib/worker_node/collector_connection.rb @@ -6,14 +6,12 @@ module WorkerNode def initialize @streams = {} @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) - @event_queue = EventQueue.new + @event_channel = UniqueChannel.new {|item| item[:unique_id] } @exiting = false - blk = ->(event) do + @event_channel.subscribe do |event| send_message(event[0], event[1]) - @event_queue.pop &blk end - @event_queue.pop &blk end def post_init @@ -74,7 +72,7 @@ module WorkerNode @streams[account_id].update(msg) log(:info, "Updated account: #{account_id}") else - stream = UserStream.new(msg, @event_queue) + stream = UserStream.new(msg, @event_channel) stream.start @streams[account_id] = stream log(:info, "Registered account: #{account_id}") diff --git a/worker_node/lib/worker_node/event_queue.rb b/worker_node/lib/worker_node/event_queue.rb deleted file mode 100644 index 6311a25..0000000 --- a/worker_node/lib/worker_node/event_queue.rb +++ /dev/null @@ -1,25 +0,0 @@ -module WorkerNode - class EventQueue - def initialize - @cache = {} - @queue = EM::Queue.new - end - - def push(type, event) - if event[:unique_id] && @cache.key?(event[:unique_id]) - WorkerNode.logger.debug("[EventQueue] Duplicate event: #{event[:unique_id]}") - else - @queue << [type, event] - if event[:unique_id] - @cache[event[:unique_id]] = true - @cache.shift if @cache.size > Settings.cache_size - # Hash#shift seems to delete the first item (CRuby 2.0.0-2.1.2) (ref: hash.c: rb_hash_shift) - end - end - end - - def pop(&blk) - @queue.pop &blk - end - end -end diff --git a/worker_node/lib/worker_node/unique_channel.rb b/worker_node/lib/worker_node/unique_channel.rb new file mode 100644 index 0000000..fbefbdd --- /dev/null +++ b/worker_node/lib/worker_node/unique_channel.rb @@ -0,0 +1,27 @@ +module WorkerNode + class UniqueChannel + def initialize(&blk) + @cache = {} + @channel = EM::Channel.new + @block = blk + end + + def push(type, event) + u = @block.call(event) + if @cache.key?(u) + WorkerNode.logger.debug("[UniqueChannel] Duplicate event: #{u}") + else + @channel.push [type, event] + if u + @cache[u] = true + @cache.shift if @cache.size > Settings.cache_size + # Hash#shift seems to delete the first item (CRuby 2.0.0-2.1.2) (ref: hash.c: rb_hash_shift) + end + end + end + + def subscribe(&blk) + @channel.subscribe &blk + end + end +end diff --git a/worker_node/lib/worker_node/user_stream.rb b/worker_node/lib/worker_node/user_stream.rb index fd8a095..d2fabcd 100644 --- a/worker_node/lib/worker_node/user_stream.rb +++ b/worker_node/lib/worker_node/user_stream.rb @@ -3,10 +3,10 @@ require "yajl" module WorkerNode class UserStream - def initialize(msg, queue) + def initialize(msg, channel) @user_id = msg[:user_id] @account_id = msg[:id] - @queue = queue + @channel = channel prepare_client(msg) end @@ -49,7 +49,7 @@ module WorkerNode client.on_unauthorized do log(:warn, "Unauthorized") - @queue.push(:unauthorized, id: @account_id, user_id: @user_id) + @channel.push(:unauthorized, id: @account_id, user_id: @user_id) self.stop end @@ -90,14 +90,14 @@ module WorkerNode def on_tweet(json) log(:debug, "Tweet: #{json[:user][:id]} => #{json[:id]}") - @queue.push(:tweet, + @channel.push(:tweet, reduce_tweet(json).merge( unique_id: json[:id])) end def on_retweet(json) log(:debug, "Retweet: #{json[:user][:id]} => #{json[:retweeted_status][:id]}") - @queue.push(:retweet, + @channel.push(:retweet, id: json[:id], user: reduce_user(json[:user]), retweeted_status: reduce_tweet(json[:retweeted_status]), @@ -106,7 +106,7 @@ module WorkerNode def on_favorite(json) log(:debug, "Favorite: #{json[:source][:id]} => #{json[:target_object][:id]}") - @queue.push(:favorite, + @channel.push(:favorite, source: reduce_user(json[:source]), target_object: reduce_tweet(json[:target_object]), unique_id: "fav-#{json[:created_at]}-#{json[:source][:id]}-#{json[:target_object][:id]}") @@ -114,7 +114,7 @@ module WorkerNode def on_unfavorite(json) log(:debug, "Unfavorite: #{json[:source][:id]} => #{json[:target_object][:id]}") - @queue.push(:unfavorite, + @channel.push(:unfavorite, source: reduce_user(json[:source]), target_object: reduce_tweet(json[:target_object]), unique_id: "unfav-#{json[:created_at]}-#{json[:source][:id]}-#{json[:target_object][:id]}") @@ -122,7 +122,7 @@ module WorkerNode def on_delete(json) log(:debug, "Delete: #{json[:delete][:status]}") - @queue.push(:delete, + @channel.push(:delete, json.merge( unique_id: "delete-#{json[:delete][:status][:id]}")) end |