diff options
author | rhenium <rhenium@rhe.jp> | 2014-05-15 06:18:31 +0900 |
---|---|---|
committer | rhenium <rhenium@rhe.jp> | 2014-05-15 06:18:31 +0900 |
commit | 72b01f47c3f8b4e435b2534b8a912e5ebdc3c603 (patch) | |
tree | 4b4fc45483439d06775c8a5f869639b4c5a414ef /worker_node | |
parent | fa128cbbd24a6f46b397e5f3b3c2f4f8c77ea522 (diff) | |
download | aclog-72b01f47c3f8b4e435b2534b8a912e5ebdc3c603.tar.gz |
worker_node: cache streaming events history in worker_node to reduce network traffic
Diffstat (limited to 'worker_node')
-rw-r--r-- | worker_node/lib/worker_node.rb | 1 | ||||
-rw-r--r-- | worker_node/lib/worker_node/collector_connection.rb | 9 | ||||
-rw-r--r-- | worker_node/lib/worker_node/event_queue.rb | 25 | ||||
-rw-r--r-- | worker_node/lib/worker_node/user_stream.rb | 40 | ||||
-rw-r--r-- | worker_node/lib/worker_node/worker.rb | 2 | ||||
-rw-r--r-- | worker_node/settings.yml.example | 1 |
6 files changed, 56 insertions, 22 deletions
diff --git a/worker_node/lib/worker_node.rb b/worker_node/lib/worker_node.rb index 129d872..fdd7579 100644 --- a/worker_node/lib/worker_node.rb +++ b/worker_node/lib/worker_node.rb @@ -1,4 +1,5 @@ require "yaml" +require "worker_node/event_queue" require "worker_node/worker" require "worker_node/collector_connection" require "worker_node/user_stream" diff --git a/worker_node/lib/worker_node/collector_connection.rb b/worker_node/lib/worker_node/collector_connection.rb index aacb2fb..c77d698 100644 --- a/worker_node/lib/worker_node/collector_connection.rb +++ b/worker_node/lib/worker_node/collector_connection.rb @@ -6,7 +6,14 @@ module WorkerNode def initialize @streams = {} @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) + @event_queue = EventQueue.new @exiting = false + + blk = ->(event) do + send_message(event[0], event[1]) + @event_queue.pop &blk + end + @event_queue.pop &blk end def post_init @@ -67,7 +74,7 @@ module WorkerNode @streams[account_id].update(msg) log(:info, "Updated account: #{account_id}") else - stream = UserStream.new(msg, method(:send_message)) + stream = UserStream.new(msg, @event_queue) stream.start @streams[account_id] = stream log(:info, "Registered account: #{account_id}") diff --git a/worker_node/lib/worker_node/event_queue.rb b/worker_node/lib/worker_node/event_queue.rb new file mode 100644 index 0000000..6311a25 --- /dev/null +++ b/worker_node/lib/worker_node/event_queue.rb @@ -0,0 +1,25 @@ +module WorkerNode + class EventQueue + def initialize + @cache = {} + @queue = EM::Queue.new + end + + def push(type, event) + if event[:unique_id] && @cache.key?(event[:unique_id]) + WorkerNode.logger.debug("[EventQueue] Duplicate event: #{event[:unique_id]}") + else + @queue << [type, event] + if event[:unique_id] + @cache[event[:unique_id]] = true + @cache.shift if @cache.size > Settings.cache_size + # Hash#shift seems to delete the first item (CRuby 2.0.0-2.1.2) (ref: hash.c: rb_hash_shift) + end + end + end + + def pop(&blk) + @queue.pop &blk + end + end +end diff --git a/worker_node/lib/worker_node/user_stream.rb b/worker_node/lib/worker_node/user_stream.rb index b39ca49..e99f9dd 100644 --- a/worker_node/lib/worker_node/user_stream.rb +++ b/worker_node/lib/worker_node/user_stream.rb @@ -3,10 +3,10 @@ require "yajl" module WorkerNode class UserStream - def initialize(msg, send_message) + def initialize(msg, queue) @user_id = msg[:user_id] @account_id = msg[:id] - @send_message = send_message + @queue = queue prepare_client(msg) end @@ -27,10 +27,6 @@ module WorkerNode end private - def send_message(event, data) - @send_message.call(event, data) - end - def prepare_client(msg) client = EM::Twitter::Client.new(client_opts(msg)) @@ -53,7 +49,7 @@ module WorkerNode client.on_unauthorized do log(:warn, "Unauthorized") - send_message(:unauthorized, id: @account_id, user_id: @user_id) + @queue.push(:unauthorized, id: @account_id, user_id: @user_id) self.stop end @@ -94,34 +90,39 @@ module WorkerNode def on_tweet(json) log(:debug, "Tweet: #{json[:user][:id]} => #{json[:id]}") - send_message(:tweet, reduce_tweet(json)) + @queue.push(:tweet, + reduce_tweet(json).merge( + unique_id: json[:id])) end def on_retweet(json) log(:debug, "Retweet: #{json[:user][:id]} => #{json[:retweeted_status][:id]}") - send_message(:retweet, - id: json[:id], - user: reduce_user(json[:user]), - retweeted_status: reduce_tweet(json[:retweeted_status])) + @queue.push(:retweet, + id: json[:id], + user: reduce_user(json[:user]), + retweeted_status: reduce_tweet(json[:retweeted_status]), + unique_id: json[:id]) end def on_favorite(json) log(:debug, "Favorite: #{json[:source][:id]} => #{json[:target_object][:id]}") - send_message(:favorite, - source: reduce_user(json[:source]), - target_object: reduce_tweet(json[:target_object])) + @queue.push(:favorite, + source: reduce_user(json[:source]), + target_object: reduce_tweet(json[:target_object]), + unique_id: "fav-#{json[:created_at]}-#{json[:source][:id]}-#{json[:target_object][:id]}") end def on_unfavorite(json) log(:debug, "Unfavorite: #{json[:source][:id]} => #{json[:target_object][:id]}") - send_message(:unfavorite, - source: reduce_user(json[:source]), - target_object: reduce_tweet(json[:target_object])) + @queue.push(:unfavorite, + source: reduce_user(json[:source]), + target_object: reduce_tweet(json[:target_object]), + unique_id: "unfav-#{json[:created_at]}-#{json[:source][:id]}-#{json[:target_object][:id]}") end def on_delete(json) log(:debug, "Delete: #{json[:delete][:status]}") - send_message(:delete, json) + @queue.push(:delete, json) end def client_opts(msg) @@ -129,7 +130,6 @@ module WorkerNode method: :get, host: "userstream.twitter.com", path: "/1.1/user.json", - params: { with: "user" }, oauth: { consumer_key: msg[:consumer_key], consumer_secret: msg[:consumer_secret], diff --git a/worker_node/lib/worker_node/worker.rb b/worker_node/lib/worker_node/worker.rb index 1ebe3ff..86f32a7 100644 --- a/worker_node/lib/worker_node/worker.rb +++ b/worker_node/lib/worker_node/worker.rb @@ -7,7 +7,7 @@ module WorkerNode stop = proc do puts "Stopping all connections...." connection.exit - EM.add_timer(2) do + EM.add_timer(0.1) do EM.stop end end diff --git a/worker_node/settings.yml.example b/worker_node/settings.yml.example index a3554a0..b46d6fd 100644 --- a/worker_node/settings.yml.example +++ b/worker_node/settings.yml.example @@ -2,3 +2,4 @@ secret_key: secret key collector_host: localhost collector_port: 42106 log_level: debug +cache_size: 10000 |