aboutsummaryrefslogtreecommitdiffstats
path: root/worker_node
diff options
context:
space:
mode:
authorrhenium <rhenium@rhe.jp>2014-05-15 06:18:31 +0900
committerrhenium <rhenium@rhe.jp>2014-05-15 06:18:31 +0900
commit72b01f47c3f8b4e435b2534b8a912e5ebdc3c603 (patch)
tree4b4fc45483439d06775c8a5f869639b4c5a414ef /worker_node
parentfa128cbbd24a6f46b397e5f3b3c2f4f8c77ea522 (diff)
downloadaclog-72b01f47c3f8b4e435b2534b8a912e5ebdc3c603.tar.gz
worker_node: cache streaming events history in worker_node to reduce network traffic
Diffstat (limited to 'worker_node')
-rw-r--r--worker_node/lib/worker_node.rb1
-rw-r--r--worker_node/lib/worker_node/collector_connection.rb9
-rw-r--r--worker_node/lib/worker_node/event_queue.rb25
-rw-r--r--worker_node/lib/worker_node/user_stream.rb40
-rw-r--r--worker_node/lib/worker_node/worker.rb2
-rw-r--r--worker_node/settings.yml.example1
6 files changed, 56 insertions, 22 deletions
diff --git a/worker_node/lib/worker_node.rb b/worker_node/lib/worker_node.rb
index 129d872..fdd7579 100644
--- a/worker_node/lib/worker_node.rb
+++ b/worker_node/lib/worker_node.rb
@@ -1,4 +1,5 @@
require "yaml"
+require "worker_node/event_queue"
require "worker_node/worker"
require "worker_node/collector_connection"
require "worker_node/user_stream"
diff --git a/worker_node/lib/worker_node/collector_connection.rb b/worker_node/lib/worker_node/collector_connection.rb
index aacb2fb..c77d698 100644
--- a/worker_node/lib/worker_node/collector_connection.rb
+++ b/worker_node/lib/worker_node/collector_connection.rb
@@ -6,7 +6,14 @@ module WorkerNode
def initialize
@streams = {}
@unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
+ @event_queue = EventQueue.new
@exiting = false
+
+ blk = ->(event) do
+ send_message(event[0], event[1])
+ @event_queue.pop &blk
+ end
+ @event_queue.pop &blk
end
def post_init
@@ -67,7 +74,7 @@ module WorkerNode
@streams[account_id].update(msg)
log(:info, "Updated account: #{account_id}")
else
- stream = UserStream.new(msg, method(:send_message))
+ stream = UserStream.new(msg, @event_queue)
stream.start
@streams[account_id] = stream
log(:info, "Registered account: #{account_id}")
diff --git a/worker_node/lib/worker_node/event_queue.rb b/worker_node/lib/worker_node/event_queue.rb
new file mode 100644
index 0000000..6311a25
--- /dev/null
+++ b/worker_node/lib/worker_node/event_queue.rb
@@ -0,0 +1,25 @@
+module WorkerNode
+ class EventQueue
+ def initialize
+ @cache = {}
+ @queue = EM::Queue.new
+ end
+
+ def push(type, event)
+ if event[:unique_id] && @cache.key?(event[:unique_id])
+ WorkerNode.logger.debug("[EventQueue] Duplicate event: #{event[:unique_id]}")
+ else
+ @queue << [type, event]
+ if event[:unique_id]
+ @cache[event[:unique_id]] = true
+ @cache.shift if @cache.size > Settings.cache_size
+ # Hash#shift seems to delete the first item (CRuby 2.0.0-2.1.2) (ref: hash.c: rb_hash_shift)
+ end
+ end
+ end
+
+ def pop(&blk)
+ @queue.pop &blk
+ end
+ end
+end
diff --git a/worker_node/lib/worker_node/user_stream.rb b/worker_node/lib/worker_node/user_stream.rb
index b39ca49..e99f9dd 100644
--- a/worker_node/lib/worker_node/user_stream.rb
+++ b/worker_node/lib/worker_node/user_stream.rb
@@ -3,10 +3,10 @@ require "yajl"
module WorkerNode
class UserStream
- def initialize(msg, send_message)
+ def initialize(msg, queue)
@user_id = msg[:user_id]
@account_id = msg[:id]
- @send_message = send_message
+ @queue = queue
prepare_client(msg)
end
@@ -27,10 +27,6 @@ module WorkerNode
end
private
- def send_message(event, data)
- @send_message.call(event, data)
- end
-
def prepare_client(msg)
client = EM::Twitter::Client.new(client_opts(msg))
@@ -53,7 +49,7 @@ module WorkerNode
client.on_unauthorized do
log(:warn, "Unauthorized")
- send_message(:unauthorized, id: @account_id, user_id: @user_id)
+ @queue.push(:unauthorized, id: @account_id, user_id: @user_id)
self.stop
end
@@ -94,34 +90,39 @@ module WorkerNode
def on_tweet(json)
log(:debug, "Tweet: #{json[:user][:id]} => #{json[:id]}")
- send_message(:tweet, reduce_tweet(json))
+ @queue.push(:tweet,
+ reduce_tweet(json).merge(
+ unique_id: json[:id]))
end
def on_retweet(json)
log(:debug, "Retweet: #{json[:user][:id]} => #{json[:retweeted_status][:id]}")
- send_message(:retweet,
- id: json[:id],
- user: reduce_user(json[:user]),
- retweeted_status: reduce_tweet(json[:retweeted_status]))
+ @queue.push(:retweet,
+ id: json[:id],
+ user: reduce_user(json[:user]),
+ retweeted_status: reduce_tweet(json[:retweeted_status]),
+ unique_id: json[:id])
end
def on_favorite(json)
log(:debug, "Favorite: #{json[:source][:id]} => #{json[:target_object][:id]}")
- send_message(:favorite,
- source: reduce_user(json[:source]),
- target_object: reduce_tweet(json[:target_object]))
+ @queue.push(:favorite,
+ source: reduce_user(json[:source]),
+ target_object: reduce_tweet(json[:target_object]),
+ unique_id: "fav-#{json[:created_at]}-#{json[:source][:id]}-#{json[:target_object][:id]}")
end
def on_unfavorite(json)
log(:debug, "Unfavorite: #{json[:source][:id]} => #{json[:target_object][:id]}")
- send_message(:unfavorite,
- source: reduce_user(json[:source]),
- target_object: reduce_tweet(json[:target_object]))
+ @queue.push(:unfavorite,
+ source: reduce_user(json[:source]),
+ target_object: reduce_tweet(json[:target_object]),
+ unique_id: "unfav-#{json[:created_at]}-#{json[:source][:id]}-#{json[:target_object][:id]}")
end
def on_delete(json)
log(:debug, "Delete: #{json[:delete][:status]}")
- send_message(:delete, json)
+ @queue.push(:delete, json)
end
def client_opts(msg)
@@ -129,7 +130,6 @@ module WorkerNode
method: :get,
host: "userstream.twitter.com",
path: "/1.1/user.json",
- params: { with: "user" },
oauth: {
consumer_key: msg[:consumer_key],
consumer_secret: msg[:consumer_secret],
diff --git a/worker_node/lib/worker_node/worker.rb b/worker_node/lib/worker_node/worker.rb
index 1ebe3ff..86f32a7 100644
--- a/worker_node/lib/worker_node/worker.rb
+++ b/worker_node/lib/worker_node/worker.rb
@@ -7,7 +7,7 @@ module WorkerNode
stop = proc do
puts "Stopping all connections...."
connection.exit
- EM.add_timer(2) do
+ EM.add_timer(0.1) do
EM.stop
end
end
diff --git a/worker_node/settings.yml.example b/worker_node/settings.yml.example
index a3554a0..b46d6fd 100644
--- a/worker_node/settings.yml.example
+++ b/worker_node/settings.yml.example
@@ -2,3 +2,4 @@ secret_key: secret key
collector_host: localhost
collector_port: 42106
log_level: debug
+cache_size: 10000