diff options
author | re4k <re4k@re4k.info> | 2013-03-21 01:39:17 +0900 |
---|---|---|
committer | re4k <re4k@re4k.info> | 2013-03-21 01:39:17 +0900 |
commit | 773d8610cda565a0bd395f16d94bbda912273237 (patch) | |
tree | 8bfaa08bfc163a69ba7da820122079abf3db62c8 /lib | |
parent | d00c4cf7f73c7c2cd46eac32b9bbc01e19d982c4 (diff) | |
download | aclog-773d8610cda565a0bd395f16d94bbda912273237.tar.gz |
Replace JSON with MessagePack
Diffstat (limited to 'lib')
-rw-r--r-- | lib/receiver/worker.rb | 311 |
1 files changed, 168 insertions, 143 deletions
diff --git a/lib/receiver/worker.rb b/lib/receiver/worker.rb index 8c44c76..b22d410 100644 --- a/lib/receiver/worker.rb +++ b/lib/receiver/worker.rb @@ -2,206 +2,231 @@ require "time" module EM class Connection - def send_chunk(data) - send_data(data + "\r\n") + def send_object(data) + send_data(data.to_msgpack) end end end class Receiver::Worker < DaemonSpawn::Base class DBProxyServer < EM::Connection - $worker_count = nil @@wq = EM::WorkQueue::WorkQueue.new do |arg| - begin - begin - json = ::Yajl::Parser.parse(arg.last, :symbolize_keys => true) - rescue ::Yajl::ParseError - # JSON parse error....?? - p $! + arg.call + end + @@wq.start + + def send_account_all + Account.where("id % ? = ?", Settings.worker_count, @worker_number).each do |account| + puts "Sent #{account.id}/#{account.user_id}" + send_account(account) + end + end + + def send_account(account) + out = {:type => "account", + :id => account.id, + :oauth_token => account.oauth_token, + :oauth_token_secret => account.oauth_token_secret, + :user_id => account.user_id} + send_object(out) + end + + def initialize + $connections ||= {} + @worker_number = nil + @pac = MessagePack::Unpacker.new + end + + def post_init + # なにもしない。クライアントが + end + + def unbind + $connections.delete_if{|k, v| v == self} + $logger.info("Connection closed: #{@worker_number}") + end + + def receive_data(data) + @pac.feed_each(data) do |msg| + unless msg["type"] + $logger.error("???: #{msg}") + send_object({:type => "fatal", :message => "Unknown data"}) + close_connection_after_writing + return + end + + if msg["type"] != "init" && !@authorized + $logger.error("Not authorized client: #{msg}") + send_object({:type => "fatal", :message => "You aren't authorized"}) + close_connection_after_writing + return + end + + case msg["type"] + when "init" + receive_init(msg) + when "unauthorized" + receive_unauthorized(msg) + when "user" + receive_user(msg) + when "tweet" + receive_tweet(msg) + when "favorite" + receive_favorite(msg) + when "retweet" + receive_retweet(msg) + when "delete" + receive_delete(msg) + when "quit" + receive_quit(msg) + else + $logger.warn("Unknown message type: #{msg["type"]}") + send_object({:type => "error", :message => "Unknown message type: #{msg["type"]}"}) + end end + end - case arg.first - when "USER" + def receive_init(msg) + secret_key = msg["secret_key"] + worker_number = msg["worker_number"] + unless secret_key == Settings.secret_key + $logger.error("Invalid secret_key: #{secret_key}") + send_object({:type => "fatal", :message => "Invalid secret_key"}) + close_connection_after_writing + return + end + $connections[worker_number] = self + @worker_number = worker_number + @authorized = true + $logger.info("Connected: #{worker_number}") + send_object({:type => "ok", :message => "Connected"}) + send_account_all + end + + def receive_unauthorized(msg) + $logger.warn("Unauthorized: #{msg["user_id"]}") + # unregister + end + + def receive_user(msg) + @@wq.push -> do $logger.debug("Received User") - rec = User.find_or_initialize_by(:id => json[:id]) - rec.screen_name = json[:screen_name] - rec.name = json[:name] - rec.profile_image_url = json[:profile_image_url] + rec = User.find_or_initialize_by(:id => msg["id"]) + rec.screen_name = msg["screen_name"] + rec.name = msg["name"] + rec.profile_image_url = msg["profile_image_url"] rec.save! if rec.changed? - when "TWEET" + end + end + + def receive_tweet(msg) + @@wq.push -> do $logger.debug("Received Tweet") begin - Tweet.create!(:id => json[:id], - :text => json[:text], - :source => json[:source], - :tweeted_at => Time.parse(json[:tweeted_at]), - :user_id => json[:user_id]) + Tweet.create!(:id => msg["id"], + :text => msg["text"], + :source => msg["source"], + :tweeted_at => Time.parse(msg["tweeted_at"]), + :user_id => msg["user_id"]) $logger.debug("Saved Tweet") rescue ActiveRecord::RecordNotUnique $logger.info("Can't Save Tweet: Duplicate") end - when "FAVORITE" + end + end + + def receive_favorite(msg) + @@wq.push -> do $logger.debug("Received Favorite") begin - Favorite.create!(:tweet_id => json[:tweet_id], - :user_id => json[:user_id]) + Favorite.create!(:tweet_id => msg["tweet_id"], + :user_id => msg["user_id"]) $logger.debug("Saved Favorite") rescue ActiveRecord::RecordNotUnique $logger.info("Can't Save Tweet: Duplicate") end - when "UNFAVORITE" - Favorite - .where("tweet_id = #{json[:tweet_id]} AND user_id = #{json[:user_id]}") - .destroy_all - when "RETWEET" + end + end + + def receive_retweet(msg) + @@wq.push -> do $logger.debug("Received Retweet") begin - Retweet.create!(:id => json[:id], - :tweet_id => json[:tweet_id], - :user_id => json[:user_id]) + Retweet.create!(:id => msg["id"], + :tweet_id => msg["tweet_id"], + :user_id => msg["user_id"]) $logger.debug("Saved Retweet") rescue ActiveRecord::RecordNotUnique $logger.info("Can't Save Retweet: Duplicate") end - when "DELETE" - tweet = Tweet.find_by(:id => json[:tweet_id]) || Retweet.find_by(:id => json[:tweet_id]) - if tweet - tweet.destroy - end - else - # ??? - puts "???????" end - rescue - $logger.error($!) - $logger.error($@) - end - end - @@wq.start - - def initialize - @worker_number = nil - @receive_buf = "" end - def post_init - # なにもしない。クライアントが - end - - def unbind - $connections.delete_if{|k, v| v == self} - $logger.info("Connection closed: #{@worker_number}") - end - - def send_account_all - Account.where("id % ? = ?", $worker_count, @worker_number).each do |account| - puts "Sent #{account.id}/#{account.user_id}" - send_account(account) + def receive_delete(msg) + @@wq.push -> do + if msg["id"] + Tweet.where(:id => msg["id"]).destroy_all + Retweet.where(:id => msg["id"]).destroy_all + elsif msg["tweet_id"] + Favorite + .where("tweet_id = #{msg["tweet_id"]} AND user_id = #{msg["user_id"]}") + .destroy_all + end end end - def send_account(account) - send_chunk("ACCOUNT #{Yajl::Encoder.encode(account.attributes)}") - end - - def receive_data(data) - @receive_buf << data - while line = @receive_buf.slice!(/.+?\r\n/) - line.chomp! - next if line == "" - arg = line.split(/ /, 2) - case arg.first - when "CONNECT" - begin - json = ::Yajl::Parser.parse(arg.last, :symbolize_keys => true) - rescue ::Yajl::ParseError - # JSON parse error....?? - p $! - end - secret_key = json[:secret_key] - worker_number = json[:worker_number] - worker_count = json[:worker_count] - if secret_key == Settings.secret_key - if $worker_count != worker_count && $connections.size > 0 - $logger.error("Error: Worker Count Difference: $worker_count=#{$worker_count}, worker_count=#{worker_count}") - send_chunk("ERROR Invalid Worker Count") - close_connection_after_writing - else - $worker_count = worker_count - $connections[worker_number] = self - @worker_number = worker_number - @authorized = true - $logger.info("Connected: #{worker_number}") - send_chunk("OK Connected") - send_account_all - end - else - $logger.error("Error: Invalid Secret Key") - send_chunk("ERROR Invalid Secret Key") - close_connection_after_writing - end - when "UNAUTHORIZED" - $logger.warn("Unauthorized: #{arg.last}") - # unregister - when "QUIT" - $logger.info("Quit: #{@worker_number}") - send_chunk("BYE") - close_connection_after_writing - else - if @authorized - @@wq.push arg - end - end - end + def receive_quit(msg) + $logger.warn("Quit: #{@worker_number}") + send_data({:type => "ok", :message => "Bye"}) + close_connection_after_writing end end class RegisterServer < EM::Connection def initialize - @receive_buf = "" + @pac = MessagePack::Unpacker.new end def post_init end def receive_data(data) - @receive_buf << data - while line = @receive_buf.slice!(/.+?\r\n/) - line.chomp! - next if line == "" - p line - sp = line.split(/ /, 2) - if sp.first == "REGISTER" - if sp.last =~ /^[0-9]+$/ - account = Account.find_by(:id => sp.last.to_i) - if account - if con = $connections[account.id % $worker_count] - con.send_account(account) - send_chunk("OK Registered") - else - send_chunk("OK Worker not found") - end + @pac.feed_each(data) do |msg| + p msg + unless msg["type"] + $logger.error("Unknown message") + send_object({:type => "fatal", :message => "Unknown message"}) + close_connection_after_writing + return + end + + case msg["type"] + when "register" + account = Account.where(:id => msg["id"]).first + if account + if con = $connections[account.id % Settings.worker_count] + con.send_account(account) + send_object({:type => "ok", :message => "Registered"}) + $logger.info("Account registered and sent") else - $logger.error("Unknown account: #{sp.last}") - send_chunk("ERROR Unknown Account") + send_object({:type => "ok", :message => "Registered but not started"}) + $logger.info("Account registered") end else - $logger.error("Invalid User ID") - send_chunk("ERROR Invalid User ID") + $logger.error("Unknown account id") + send_object({:type => "error", :message => "Unknown account id"}) end + close_connection_after_writing else - $logger.error("Unknown Command: #{sp})") - send_chunk("ERROR Unknown command") + $logger.warn("Unknown register command: #{msg["type"]}") end - close_connection_after_writing - return end end end def initialize(opts = {}) - super(opts) - $logger = Receiver::Logger.new(:warn) + #super(opts) + $logger = Receiver::Logger.new(:debug) $connections = {} end |