aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorRhenium <rhenium@rhe.jp>2013-12-20 21:42:05 +0900
committerRhenium <rhenium@rhe.jp>2013-12-20 21:42:05 +0900
commit4b56cd6b67d890c5e2ed763656a7fd7832b10d8f (patch)
treeeb394e9c187b8bbc60b54d16b72200edbf9380ab /lib
parent376c50784c2c740c6caacd1e94550e3d9d77fe41 (diff)
downloadaclog-4b56cd6b67d890c5e2ed763656a7fd7832b10d8f.tar.gz
update workers
Diffstat (limited to 'lib')
-rw-r--r--lib/aclog/receiver/collector_connection.rb149
-rw-r--r--lib/aclog/receiver/register_server.rb1
-rw-r--r--lib/aclog/receiver/worker.rb6
3 files changed, 66 insertions, 90 deletions
diff --git a/lib/aclog/receiver/collector_connection.rb b/lib/aclog/receiver/collector_connection.rb
index 36f652f..9f43f00 100644
--- a/lib/aclog/receiver/collector_connection.rb
+++ b/lib/aclog/receiver/collector_connection.rb
@@ -1,166 +1,141 @@
-# -*- coding: utf-8 -*-
-require "time"
-
module Aclog
module Receiver
class CollectorConnection < EM::Connection
- def initialize(connections)
+ def initialize(channel, connections)
+ @channel = channel
@connections = connections
@worker_number = nil
- @pac = MessagePack::Unpacker.new
-
- unless defined? @@queue
- @@queue = EM::Queue.new
-
- _cr = -> bl { bl.call; @@queue.pop(&_cr) }
- EM.defer { @@queue.pop(&_cr) }
- end
+ @unpacker = MessagePack::Unpacker.new
end
def send_account(account)
send_object(type: "account",
id: account.id,
+ consumer_key: Settings.consumer.key,
+ consumer_secret: Settings.consumer.secret,
oauth_token: account.oauth_token,
oauth_token_secret: account.oauth_token_secret,
user_id: account.user_id)
- Rails.logger.debug("Sent #{account.id}/#{account.user_id}")
+ log(:debug, "send: #{account.id}/#{account.user_id}")
end
def send_stop_account(account)
send_object(type: "stop",
id: account.id)
- Rails.logger.debug("Sent Stop #{account.id}/#{account.user_id}")
+ log(:debug, "send stop: #{account.id}/#{account.user_id}")
end
def post_init
- # なにもしない。クライアントが
end
def unbind
@connections.reject! {|k, v| v == self }
- Rails.logger.info("Connection closed(#{@worker_number})")
+ log(:info, "connection closed")
end
def receive_data(data)
- @pac.feed_each(data) do |msg|
+ @unpacker.feed_each(data) do |msg|
unless msg.is_a?(Hash) && msg["type"]
- Rails.logger.warn("Unknown data: #{msg}")
- send_object(type: "fatal", message: "Unknown data")
+ log(:error, "unknown data: #{msg}")
+ send_object(type: "fatal", message: "unknown data")
close_connection_after_writing
return
end
- if not @authorized and not msg["type"] == "init"
- Rails.logger.warn("Not authorized client: #{msg}")
- send_object(type: "fatal", message: "You aren't authorized")
- close_connection_after_writing
+ unless @authorized
+ if msg["type"] == "auth"
+ auth(msg)
+ else
+ log(:warn, "not authorized client: #{msg}")
+ send_object(type: "fatal", message: "You aren't authorized")
+ close_connection_after_writing
+ end
return
end
case msg["type"]
- when "init"
- receive_init(msg)
when "unauthorized"
- receive_unauthorized(msg)
+ @channel << -> {
+ log(:warn, "unauthorized: #{msg["user_id"]}")
+ }
when "tweet"
- receive_tweet(msg)
+ @channel << -> {
+ log(:debug, "receive tweet: #{msg["id"]}")
+ Tweet.from_receiver(msg)
+ }
when "favorite"
- receive_favorite(msg)
+ @channel << -> {
+ log(:debug, "receive favorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}")
+ if f = Favorite.from_receiver(msg)
+ f.tweet.notify_favorite
+ end
+ }
when "unfavorite"
- receive_unfavorite(msg)
+ @channel << -> {
+ log(:debug, "receive unfavorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}")
+ Favorite.delete_from_receiver(msg)
+ }
when "retweet"
- receive_retweet(msg)
+ @channel << -> {
+ log(:debug, "receive retweet: #{msg["user"]["id"]} => #{msg["retweeted_status"]["id"]}")
+ Retweet.from_receiver(msg)
+ }
when "delete"
- receive_delete(msg)
+ @channel << -> {
+ log(:debug, "receive delete: #{msg["id"]}")
+ Tweet.delete_from_receiver(msg)
+ }
when "quit"
- Rails.logger.info("Quit(#{@worker_number}): #{msg["reason"]}")
+ log(:info, "receive quit: #{msg["reason"]}")
send_data(type: "quit", message: "Bye")
close_connection_after_writing
else
- Rails.logger.warn("Unknown message type(#{@worker_number}): #{msg["type"]}")
- send_object(type: "error", message: "Unknown message type: #{msg["type"]}")
+ log(:warn, "unknown message: #{msg["type"]}")
+ send_object(type: "error", message: "Unknown message type")
end
end
end
private
+ def log(level, message)
+ text = "[RECEIVER"
+ text << ":#{@worker_number}" if @worker_number
+ text << "] #{message}"
+ Rails.logger.__send__(level, text)
+ end
+
def send_object(data)
send_data(data.to_msgpack)
end
- def receive_init(msg)
+ def auth(msg)
secret_key = msg["secret_key"]
unless secret_key == Settings.collector.secret_key
- Rails.logger.warn("Invalid secret_key: \"#{secret_key}\"")
- send_object(type: "fatal", message: "Invalid secret_key")
+ log(:warn, "Invalid secret_key: \"#{secret_key}\"")
+ send_object(type: "fatal", message: "invalid secret_key")
close_connection_after_writing
return
end
- worker_number = Settings.collector.count.times.find {|num| !@connections.key?(num) }
+ worker_number = (Settings.collector.count.times.to_a - @connections.keys).sort.first
if worker_number == nil
- Rails.logger.warn("Invalid worker_number: #{worker_number}")
- send_object(type: "fatal", message: "Invalid worker_number")
+ log(:warn, "all connection alive")
+ send_object(type: "fatal", message: "all connection alive")
close_connection_after_writing
return
end
- if @connections[worker_number]
- @connections[worker_number].close_connection
- end
@connections[worker_number] = self
@worker_number = worker_number
@authorized = true
- Rails.logger.info("Connected(#{@worker_number})")
- send_object(type: "ok", message: "Connected")
+ log(:info, "connect")
+ send_object(type: "ok", message: "connected")
Account.set_of_collector(@worker_number).each do |account|
send_account(account)
end
end
-
- def receive_unauthorized(msg)
- Rails.logger.warn("Unauthorized(#{@worker_number}): #{msg["user_id"]}")
- # unregister
- end
-
- def receive_tweet(msg)
- @@queue.push -> do
- Rails.logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}")
- Tweet.from_receiver(msg)
- end
- end
-
- def receive_favorite(msg)
- @@queue.push -> do
- Rails.logger.debug("Receive Favorite(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}")
- if f = Favorite.from_receiver(msg)
- f.tweet.notify_favorite
- end
- end
- end
-
- def receive_unfavorite(msg)
- @@queue.push -> do
- Rails.logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}")
- Favorite.delete_from_receiver(msg)
- end
- end
-
- def receive_retweet(msg)
- @@queue.push -> do
- Rails.logger.debug("Receive Retweet(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}")
- Retweet.from_receiver(msg)
- end
- end
-
- def receive_delete(msg)
- @@queue.push -> do
- Rails.logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}")
- Tweet.delete_from_receiver(msg)
- end
- end
end
end
end
diff --git a/lib/aclog/receiver/register_server.rb b/lib/aclog/receiver/register_server.rb
index 541efd5..f5b6060 100644
--- a/lib/aclog/receiver/register_server.rb
+++ b/lib/aclog/receiver/register_server.rb
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
module Aclog
module Receiver
class RegisterServer
diff --git a/lib/aclog/receiver/worker.rb b/lib/aclog/receiver/worker.rb
index 9896853..db60951 100644
--- a/lib/aclog/receiver/worker.rb
+++ b/lib/aclog/receiver/worker.rb
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
require "msgpack/rpc/transport/unix"
module Aclog
@@ -17,9 +16,12 @@ module Aclog
Rails.logger.info("Receiver started")
File.delete(_sock_path) if File.exists?(_sock_path)
EM.run do
+ channel = EM::Channel.new
+ EM.defer { channel.subscribe(&:call) }
+
connections = {}
- collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, connections)
+ collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, channel, connections)
reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(_sock_path)
register_server = MessagePack::RPC::Server.new