aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorrhenium <re4k@re4k.info>2013-05-11 13:36:13 +0900
committerrhenium <re4k@re4k.info>2013-05-11 13:36:13 +0900
commite5accc8e74bf2a6f66638b1085ee7cc85abd281c (patch)
tree33399d3b0742dca1f79fa51eda0a48ba10d1b471 /lib
parente05734159a0166664f5fc62ddf13c3a7624836f9 (diff)
downloadaclog-e5accc8e74bf2a6f66638b1085ee7cc85abd281c.tar.gz
use MessagePack-RPC in Login - Daemon
Diffstat (limited to 'lib')
-rw-r--r--lib/aclog/receiver/collector_server.rb106
-rw-r--r--lib/aclog/receiver/worker.rb71
2 files changed, 78 insertions, 99 deletions
diff --git a/lib/aclog/receiver/collector_server.rb b/lib/aclog/receiver/collector_server.rb
index 3aee3bc..1130a25 100644
--- a/lib/aclog/receiver/collector_server.rb
+++ b/lib/aclog/receiver/collector_server.rb
@@ -1,50 +1,34 @@
# -*- coding: utf-8 -*-
+require "time"
+
module Aclog
module Receiver
class CollectorServer < EM::Connection
- def send_account_all
- Account.where("id % ? = ?", Settings.worker_count, @worker_number).each do |account|
- send_account(account)
- end
- end
-
- def send_account(account)
- out = {:type => "account",
- :id => account.id,
- :oauth_token => account.oauth_token,
- :oauth_token_secret => account.oauth_token_secret,
- :user_id => account.user_id,
- :consumer_version => account.consumer_version.to_i}
- send_object(out)
- Rails.logger.debug("Sent #{account.id}/#{account.user_id}")
- end
-
- def self.send_account(account)
- if con = @@connections[account.id % Settings.worker_count]
- con.send_account(account)
- end
- end
-
- def initialize
- @@connections ||= {}
+ def initialize(connections)
+ @connections = connections
@worker_number = nil
@pac = MessagePack::Unpacker.new
- @@saved_tweets ||= []
- unless defined?(@@wq)
- @@wq = EM::Queue.new # ふぁぼ以外
- EM.defer do
- wcb = -> msg{msg.call; @@wq.pop &wcb}
- @@wq.pop &wcb
- end
+ unless defined? @@queue
+ @@queue = EM::Queue.new
- @@nq = EM::Queue.new # 通知するやつ(ふぁぼ)
- EM.defer do
- ncb = -> msg{msg.call; @@nq.pop &ncb}
- @@nq.pop &ncb
- end
+ _cr = -> bl { bl.call; @@queue.pop &_cr }
+ EM.defer { @@queue.pop &_cr }
end
+
+ @@saved_tweets ||= []
+ end
+
+ def send_account(account)
+ send_object(
+ type: "account",
+ id: account.id,
+ oauth_token: account.oauth_token,
+ oauth_token_secret: account.oauth_token_secret,
+ user_id: account.user_id,
+ consumer_version: account.consumer_version)
+ Rails.logger.debug("Sent #{account.id}/#{account.user_id}")
end
def post_init
@@ -53,21 +37,21 @@ module Aclog
def unbind
Rails.logger.info("Connection closed(#{@worker_number})")
- @@connections.delete_if{|k, v| v == self}
+ @connections.delete_if{|k, v| v == self}
end
def receive_data(data)
@pac.feed_each(data) do |msg|
unless msg.is_a?(Hash) && msg["type"]
Rails.logger.warn("Unknown data: #{msg}")
- send_object({:type => "fatal", :message => "Unknown data"})
+ send_object(type: "fatal", message: "Unknown data")
close_connection_after_writing
return
end
- if msg["type"] != "init" && !@authorized
+ if not @authorized and not msg["type"] == "init"
Rails.logger.warn("Not authorized client: #{msg}")
- send_object({:type => "fatal", :message => "You aren't authorized"})
+ send_object(type: "fatal", message: "You aren't authorized")
close_connection_after_writing
return
end
@@ -90,41 +74,49 @@ module Aclog
when "spam"
receive_spam(msg)
when "quit"
- # Heroku の cycling など
Rails.logger.info("Quit(#{@worker_number}): #{msg["reason"]}")
- send_data({:type => "quit", :message => "Bye"})
+ send_data(type: "quit", message: "Bye")
close_connection_after_writing
else
Rails.logger.warn("Unknown message type(#{@worker_number}): #{msg["type"]}")
- send_object({:type => "error", :message => "Unknown message type: #{msg["type"]}"})
+ send_object(type: "error", message: "Unknown message type: #{msg["type"]}")
end
end
end
+ private
+ def send_object(data)
+ send_data(data.to_msgpack)
+ end
+
def receive_init(msg)
secret_key = msg["secret_key"]
worker_number = msg["worker_number"]
unless secret_key == Settings.secret_key
Rails.logger.warn("Invalid secret_key(?:#{worker_number}): \"#{secret_key}\"")
- send_object({:type => "fatal", :message => "Invalid secret_key"})
+ send_object(type: "fatal", message: "Invalid secret_key")
close_connection_after_writing
return
end
if worker_number > Settings.worker_count
Rails.logger.warn("Invalid worker_number: #{worker_number}, secret_key: \"#{secret_key}\"")
- send_object({:type => "fatal", :message => "Invalid worker_number"})
+ send_object(type: "fatal", message: "Invalid worker_number")
close_connection_after_writing
return
end
- if @@connections[worker_number]
- @@connections[worker_number].close_connection
+
+ if @connections[worker_number]
+ @connections[worker_number].close_connection
end
- @@connections[worker_number] = self
+ @connections[worker_number] = self
@worker_number = worker_number
@authorized = true
Rails.logger.info("Connected(#{@worker_number})")
- send_object({:type => "ok", :message => "Connected"})
- send_account_all
+ send_object(type: "ok", message: "Connected")
+
+ Account.where("id % ? = ?", Settings.worker_count, @worker_number).each do |account|
+ send_account(account)
+ end
end
def receive_unauthorized(msg)
@@ -133,7 +125,7 @@ module Aclog
end
def receive_user(msg)
- @@wq.push -> do
+ @@queue.push -> do
Rails.logger.debug("Received User(#{@worker_number}): #{msg["id"]}")
User.from_hash(:id => msg["id"],
:screen_name => msg["screen_name"],
@@ -144,7 +136,7 @@ module Aclog
end
def receive_tweet(msg)
- @@wq.push -> do
+ @@queue.push -> do
Rails.logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}")
unless @@saved_tweets.include?(msg["id"])
@@saved_tweets << msg["id"]
@@ -164,7 +156,7 @@ module Aclog
end
def receive_favorite(msg)
- @@nq.push -> do
+ @@queue.push -> do
Rails.logger.debug("Receive Favorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
f = Favorite.from_hash(:tweet_id => msg["tweet_id"],
:user_id => msg["user_id"])
@@ -175,7 +167,7 @@ module Aclog
end
def receive_retweet(msg)
- @@wq.push -> do
+ @@queue.push -> do
Rails.logger.debug("Receive Retweet(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
Retweet.from_hash(:id => msg["id"],
:tweet_id => msg["tweet_id"],
@@ -184,7 +176,7 @@ module Aclog
end
def receive_delete(msg)
- @@wq.push -> do
+ @@queue.push -> do
if msg["id"]
Rails.logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}")
Tweet.delete_from_id(msg["id"])
@@ -198,7 +190,7 @@ module Aclog
def receive_spam(msg)
Rails.logger.info("Receive Spam(#{@worker_number}): #{msg["id"]}")
- # @@wq.push -> do
+ # @@queue.push -> do
# # TODO
# end
end
diff --git a/lib/aclog/receiver/worker.rb b/lib/aclog/receiver/worker.rb
index 7f5e5e9..fe383e2 100644
--- a/lib/aclog/receiver/worker.rb
+++ b/lib/aclog/receiver/worker.rb
@@ -1,49 +1,28 @@
# -*- coding: utf-8 -*-
-require "time"
+require "msgpack/rpc/transport/unix"
-module EM
- class Connection
- def send_object(data)
- send_data(data.to_msgpack)
- end
- end
-end
module Aclog
module Receiver
class Worker < DaemonSpawn::Base
- class RegisterServer < EM::Connection
- def initialize
- @pac = MessagePack::Unpacker.new
+ class RegisterServer
+ def initialize(connections)
+ @connections = connections
end
- def post_init
+ def register(account_)
+ account = Marshal.load(account_)
+ con_num = account.id % Settings.worker_count
+ con = @connections[con_num]
+ if con
+ con.send_account(account)
+ Rails.logger.info("Sent account: connection_number: #{con_num} / account_id: #{account.id}")
+ else
+ Rails.logger.info("Connection not found: connection_number: #{con_num} / account_id: #{account.id}")
+ end
end
- def receive_data(data)
- @pac.feed_each(data) do |msg|
- Rails.logger.debug(msg.to_s)
- unless msg["type"]
- Rails.logger.error("Unknown message")
- send_object({:type => "fatal", :message => "Unknown message"})
- close_connection_after_writing
- return
- end
-
- case msg["type"]
- when "register"
- account = Account.where(:id => msg["id"]).first
- if account
- Aclog::Receiver::CollectorServer.send_account(account)
- Rails.logger.info("Account registered and sent")
- else
- Rails.logger.error("Unknown account id")
- send_object({:type => "error", :message => "Unknown account id"})
- end
- close_connection_after_writing
- else
- Rails.logger.warn("Unknown register command: #{msg["type"]}")
- end
- end
+ def unregister(account)
+ # TODO
end
end
@@ -55,17 +34,25 @@ module Aclog
end
def start(args)
- Rails.logger.info("Database Proxy Started")
+ Rails.logger.info("Receiver started")
EM.run do
- o = EM.start_server("0.0.0.0", Settings.listen_port, Aclog::Receiver::CollectorServer)
- i = EM.start_unix_domain_server(File.join(Rails.root, "tmp", "sockets", "receiver.sock"), RegisterServer)
+ connections = {}
+
+ collector_server = EM.start_server("0.0.0.0", Settings.listen_port, Aclog::Receiver::CollectorServer, connections)
+
+ reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(File.join(Rails.root, "tmp", "sockets", "receiver.sock"))
+ register_server = MessagePack::RPC::Server.new
+ register_server.listen(reg_svr_listener, RegisterServer.new(connections))
+ EM.defer { register_server.run }
stop = Proc.new do
- EM.stop_server(o)
- EM.stop_server(i)
+ EM.stop_server(collector_server)
+ register_server.close
+ File.delete(File.join(Rails.root, "tmp", "sockets", "receiver.sock"))
EM.stop
end
Signal.trap(:INT, &stop)
+ Signal.trap(:TERM, &stop)
end
end