aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorre4k <re4k@re4k.info>2013-03-21 01:39:17 +0900
committerre4k <re4k@re4k.info>2013-03-21 01:39:17 +0900
commit773d8610cda565a0bd395f16d94bbda912273237 (patch)
tree8bfaa08bfc163a69ba7da820122079abf3db62c8 /lib
parentd00c4cf7f73c7c2cd46eac32b9bbc01e19d982c4 (diff)
downloadaclog-773d8610cda565a0bd395f16d94bbda912273237.tar.gz
Replace JSON with MessagePack
Diffstat (limited to 'lib')
-rw-r--r--lib/receiver/worker.rb311
1 files changed, 168 insertions, 143 deletions
diff --git a/lib/receiver/worker.rb b/lib/receiver/worker.rb
index 8c44c76..b22d410 100644
--- a/lib/receiver/worker.rb
+++ b/lib/receiver/worker.rb
@@ -2,206 +2,231 @@ require "time"
module EM
class Connection
- def send_chunk(data)
- send_data(data + "\r\n")
+ def send_object(data)
+ send_data(data.to_msgpack)
end
end
end
class Receiver::Worker < DaemonSpawn::Base
class DBProxyServer < EM::Connection
- $worker_count = nil
@@wq = EM::WorkQueue::WorkQueue.new do |arg|
- begin
- begin
- json = ::Yajl::Parser.parse(arg.last, :symbolize_keys => true)
- rescue ::Yajl::ParseError
- # JSON parse error....??
- p $!
+ arg.call
+ end
+ @@wq.start
+
+ def send_account_all
+ Account.where("id % ? = ?", Settings.worker_count, @worker_number).each do |account|
+ puts "Sent #{account.id}/#{account.user_id}"
+ send_account(account)
+ end
+ end
+
+ def send_account(account)
+ out = {:type => "account",
+ :id => account.id,
+ :oauth_token => account.oauth_token,
+ :oauth_token_secret => account.oauth_token_secret,
+ :user_id => account.user_id}
+ send_object(out)
+ end
+
+ def initialize
+ $connections ||= {}
+ @worker_number = nil
+ @pac = MessagePack::Unpacker.new
+ end
+
+ def post_init
+ # なにもしない。クライアントが
+ end
+
+ def unbind
+ $connections.delete_if{|k, v| v == self}
+ $logger.info("Connection closed: #{@worker_number}")
+ end
+
+ def receive_data(data)
+ @pac.feed_each(data) do |msg|
+ unless msg["type"]
+ $logger.error("???: #{msg}")
+ send_object({:type => "fatal", :message => "Unknown data"})
+ close_connection_after_writing
+ return
+ end
+
+ if msg["type"] != "init" && !@authorized
+ $logger.error("Not authorized client: #{msg}")
+ send_object({:type => "fatal", :message => "You aren't authorized"})
+ close_connection_after_writing
+ return
+ end
+
+ case msg["type"]
+ when "init"
+ receive_init(msg)
+ when "unauthorized"
+ receive_unauthorized(msg)
+ when "user"
+ receive_user(msg)
+ when "tweet"
+ receive_tweet(msg)
+ when "favorite"
+ receive_favorite(msg)
+ when "retweet"
+ receive_retweet(msg)
+ when "delete"
+ receive_delete(msg)
+ when "quit"
+ receive_quit(msg)
+ else
+ $logger.warn("Unknown message type: #{msg["type"]}")
+ send_object({:type => "error", :message => "Unknown message type: #{msg["type"]}"})
+ end
end
+ end
- case arg.first
- when "USER"
+ def receive_init(msg)
+ secret_key = msg["secret_key"]
+ worker_number = msg["worker_number"]
+ unless secret_key == Settings.secret_key
+ $logger.error("Invalid secret_key: #{secret_key}")
+ send_object({:type => "fatal", :message => "Invalid secret_key"})
+ close_connection_after_writing
+ return
+ end
+ $connections[worker_number] = self
+ @worker_number = worker_number
+ @authorized = true
+ $logger.info("Connected: #{worker_number}")
+ send_object({:type => "ok", :message => "Connected"})
+ send_account_all
+ end
+
+ def receive_unauthorized(msg)
+ $logger.warn("Unauthorized: #{msg["user_id"]}")
+ # unregister
+ end
+
+ def receive_user(msg)
+ @@wq.push -> do
$logger.debug("Received User")
- rec = User.find_or_initialize_by(:id => json[:id])
- rec.screen_name = json[:screen_name]
- rec.name = json[:name]
- rec.profile_image_url = json[:profile_image_url]
+ rec = User.find_or_initialize_by(:id => msg["id"])
+ rec.screen_name = msg["screen_name"]
+ rec.name = msg["name"]
+ rec.profile_image_url = msg["profile_image_url"]
rec.save! if rec.changed?
- when "TWEET"
+ end
+ end
+
+ def receive_tweet(msg)
+ @@wq.push -> do
$logger.debug("Received Tweet")
begin
- Tweet.create!(:id => json[:id],
- :text => json[:text],
- :source => json[:source],
- :tweeted_at => Time.parse(json[:tweeted_at]),
- :user_id => json[:user_id])
+ Tweet.create!(:id => msg["id"],
+ :text => msg["text"],
+ :source => msg["source"],
+ :tweeted_at => Time.parse(msg["tweeted_at"]),
+ :user_id => msg["user_id"])
$logger.debug("Saved Tweet")
rescue ActiveRecord::RecordNotUnique
$logger.info("Can't Save Tweet: Duplicate")
end
- when "FAVORITE"
+ end
+ end
+
+ def receive_favorite(msg)
+ @@wq.push -> do
$logger.debug("Received Favorite")
begin
- Favorite.create!(:tweet_id => json[:tweet_id],
- :user_id => json[:user_id])
+ Favorite.create!(:tweet_id => msg["tweet_id"],
+ :user_id => msg["user_id"])
$logger.debug("Saved Favorite")
rescue ActiveRecord::RecordNotUnique
$logger.info("Can't Save Tweet: Duplicate")
end
- when "UNFAVORITE"
- Favorite
- .where("tweet_id = #{json[:tweet_id]} AND user_id = #{json[:user_id]}")
- .destroy_all
- when "RETWEET"
+ end
+ end
+
+ def receive_retweet(msg)
+ @@wq.push -> do
$logger.debug("Received Retweet")
begin
- Retweet.create!(:id => json[:id],
- :tweet_id => json[:tweet_id],
- :user_id => json[:user_id])
+ Retweet.create!(:id => msg["id"],
+ :tweet_id => msg["tweet_id"],
+ :user_id => msg["user_id"])
$logger.debug("Saved Retweet")
rescue ActiveRecord::RecordNotUnique
$logger.info("Can't Save Retweet: Duplicate")
end
- when "DELETE"
- tweet = Tweet.find_by(:id => json[:tweet_id]) || Retweet.find_by(:id => json[:tweet_id])
- if tweet
- tweet.destroy
- end
- else
- # ???
- puts "???????"
end
- rescue
- $logger.error($!)
- $logger.error($@)
- end
- end
- @@wq.start
-
- def initialize
- @worker_number = nil
- @receive_buf = ""
end
- def post_init
- # なにもしない。クライアントが
- end
-
- def unbind
- $connections.delete_if{|k, v| v == self}
- $logger.info("Connection closed: #{@worker_number}")
- end
-
- def send_account_all
- Account.where("id % ? = ?", $worker_count, @worker_number).each do |account|
- puts "Sent #{account.id}/#{account.user_id}"
- send_account(account)
+ def receive_delete(msg)
+ @@wq.push -> do
+ if msg["id"]
+ Tweet.where(:id => msg["id"]).destroy_all
+ Retweet.where(:id => msg["id"]).destroy_all
+ elsif msg["tweet_id"]
+ Favorite
+ .where("tweet_id = #{msg["tweet_id"]} AND user_id = #{msg["user_id"]}")
+ .destroy_all
+ end
end
end
- def send_account(account)
- send_chunk("ACCOUNT #{Yajl::Encoder.encode(account.attributes)}")
- end
-
- def receive_data(data)
- @receive_buf << data
- while line = @receive_buf.slice!(/.+?\r\n/)
- line.chomp!
- next if line == ""
- arg = line.split(/ /, 2)
- case arg.first
- when "CONNECT"
- begin
- json = ::Yajl::Parser.parse(arg.last, :symbolize_keys => true)
- rescue ::Yajl::ParseError
- # JSON parse error....??
- p $!
- end
- secret_key = json[:secret_key]
- worker_number = json[:worker_number]
- worker_count = json[:worker_count]
- if secret_key == Settings.secret_key
- if $worker_count != worker_count && $connections.size > 0
- $logger.error("Error: Worker Count Difference: $worker_count=#{$worker_count}, worker_count=#{worker_count}")
- send_chunk("ERROR Invalid Worker Count")
- close_connection_after_writing
- else
- $worker_count = worker_count
- $connections[worker_number] = self
- @worker_number = worker_number
- @authorized = true
- $logger.info("Connected: #{worker_number}")
- send_chunk("OK Connected")
- send_account_all
- end
- else
- $logger.error("Error: Invalid Secret Key")
- send_chunk("ERROR Invalid Secret Key")
- close_connection_after_writing
- end
- when "UNAUTHORIZED"
- $logger.warn("Unauthorized: #{arg.last}")
- # unregister
- when "QUIT"
- $logger.info("Quit: #{@worker_number}")
- send_chunk("BYE")
- close_connection_after_writing
- else
- if @authorized
- @@wq.push arg
- end
- end
- end
+ def receive_quit(msg)
+ $logger.warn("Quit: #{@worker_number}")
+ send_data({:type => "ok", :message => "Bye"})
+ close_connection_after_writing
end
end
class RegisterServer < EM::Connection
def initialize
- @receive_buf = ""
+ @pac = MessagePack::Unpacker.new
end
def post_init
end
def receive_data(data)
- @receive_buf << data
- while line = @receive_buf.slice!(/.+?\r\n/)
- line.chomp!
- next if line == ""
- p line
- sp = line.split(/ /, 2)
- if sp.first == "REGISTER"
- if sp.last =~ /^[0-9]+$/
- account = Account.find_by(:id => sp.last.to_i)
- if account
- if con = $connections[account.id % $worker_count]
- con.send_account(account)
- send_chunk("OK Registered")
- else
- send_chunk("OK Worker not found")
- end
+ @pac.feed_each(data) do |msg|
+ p msg
+ unless msg["type"]
+ $logger.error("Unknown message")
+ send_object({:type => "fatal", :message => "Unknown message"})
+ close_connection_after_writing
+ return
+ end
+
+ case msg["type"]
+ when "register"
+ account = Account.where(:id => msg["id"]).first
+ if account
+ if con = $connections[account.id % Settings.worker_count]
+ con.send_account(account)
+ send_object({:type => "ok", :message => "Registered"})
+ $logger.info("Account registered and sent")
else
- $logger.error("Unknown account: #{sp.last}")
- send_chunk("ERROR Unknown Account")
+ send_object({:type => "ok", :message => "Registered but not started"})
+ $logger.info("Account registered")
end
else
- $logger.error("Invalid User ID")
- send_chunk("ERROR Invalid User ID")
+ $logger.error("Unknown account id")
+ send_object({:type => "error", :message => "Unknown account id"})
end
+ close_connection_after_writing
else
- $logger.error("Unknown Command: #{sp})")
- send_chunk("ERROR Unknown command")
+ $logger.warn("Unknown register command: #{msg["type"]}")
end
- close_connection_after_writing
- return
end
end
end
def initialize(opts = {})
- super(opts)
- $logger = Receiver::Logger.new(:warn)
+ #super(opts)
+ $logger = Receiver::Logger.new(:debug)
$connections = {}
end