aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorre4k <re4k@re4k.info>2013-04-03 16:37:31 +0900
committerre4k <re4k@re4k.info>2013-04-03 16:37:31 +0900
commit532f0445a5812259a547cda32ac78ebd79d2bc77 (patch)
tree6f5d2abf9ee90419f46e8c012f25041c52447d21 /lib
parentc5290eafc803266f1d7f48f7c229f6024b3dbaa6 (diff)
downloadaclog-532f0445a5812259a547cda32ac78ebd79d2bc77.tar.gz
auto pager, refactor models, favs notification
Diffstat (limited to 'lib')
-rw-r--r--lib/aclog/notification.rb24
-rw-r--r--lib/receiver/worker.rb141
2 files changed, 102 insertions, 63 deletions
diff --git a/lib/aclog/notification.rb b/lib/aclog/notification.rb
new file mode 100644
index 0000000..b2a89ba
--- /dev/null
+++ b/lib/aclog/notification.rb
@@ -0,0 +1,24 @@
+# -*- coding: utf-8 -*-
+module Aclog
+ module Notification
+ def self.reply_favs(tweet, count)
+ reply_tweet(tweet.user, "#{count}favs!", tweet)
+ end
+
+ private
+ def self.reply_tweet(user, text, tweet)
+ @@account ||= Twitter::Client.new(consumer_key: Settings.notification.consumer.key,
+ consumer_secret: Settings.notification.consumer.secret,
+ oauth_token: Settings.notification.token.token,
+ oauth_token_secret: Settings.notification.token.secret)
+
+ url = Settings.tweet_url.gsub(/:id/, tweet.id.to_s)
+ begin
+ @@account.update("@#{user.screen_name} #{text} #{url}", :in_reply_to_status_id => tweet.id)
+ rescue Exception
+ logger.error($!)
+ logger.error($@)
+ end
+ end
+ end
+end
diff --git a/lib/receiver/worker.rb b/lib/receiver/worker.rb
index 345ad73..5387a8c 100644
--- a/lib/receiver/worker.rb
+++ b/lib/receiver/worker.rb
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
require "time"
module EM
@@ -12,7 +13,6 @@ class Receiver::Worker < DaemonSpawn::Base
class DBProxyServer < EM::Connection
def send_account_all
Account.where("id % ? = ?", Settings.worker_count, @worker_number).each do |account|
- puts "Sent #{account.id}/#{account.user_id}"
send_account(account)
end
end
@@ -25,20 +25,34 @@ class Receiver::Worker < DaemonSpawn::Base
:user_id => account.user_id,
:consumer_version => account.consumer_version.to_i}
send_object(out)
+ Rails.logger.debug("Sent #{account.id}/#{account.user_id}")
+ end
+
+ def self.send_account(account)
+ if con = @@connections[account.id % Settings.worker_count]
+ con.send_account(account)
+ end
end
def initialize
- $connections ||= {}
+ @@connections ||= {}
@worker_number = nil
@pac = MessagePack::Unpacker.new
@@saved_tweets ||= []
unless defined?(@@wq)
- @@wq = EM::WorkQueue::WorkQueue.new do |arg|
- arg.call
+ @@wq = EM::Queue.new # ふぁぼ以外
+ EM.defer do
+ wcb = -> msg{msg.call; @@wq.pop &wcb}
+ @@wq.pop &wcb
+ end
+
+ @@nq = EM::Queue.new # 通知するやつ(ふぁぼ)
+ EM.defer do
+ ncb = -> msg{msg.call; @@nq.pop &ncb}
+ @@nq.pop &ncb
end
- @@wq.start
end
end
@@ -47,21 +61,21 @@ class Receiver::Worker < DaemonSpawn::Base
end
def unbind
- $connections.delete_if{|k, v| v == self}
- $logger.info("Connection closed(#{@worker_number})")
+ Rails.logger.info("Connection closed(#{@worker_number})")
+ @@connections.delete_if{|k, v| v == self}
end
def receive_data(data)
@pac.feed_each(data) do |msg|
unless msg.is_a?(Hash) && msg["type"]
- $logger.error("???(#{@worker_number}): #{msg}")
+ Rails.logger.warn("Unknown data: #{msg}")
send_object({:type => "fatal", :message => "Unknown data"})
close_connection_after_writing
return
end
if msg["type"] != "init" && !@authorized
- $logger.error("Not authorized client: #{msg}")
+ Rails.logger.warn("Not authorized client: #{msg}")
send_object({:type => "fatal", :message => "You aren't authorized"})
close_connection_after_writing
return
@@ -83,9 +97,12 @@ class Receiver::Worker < DaemonSpawn::Base
when "delete"
receive_delete(msg)
when "quit"
- receive_quit(msg)
+ # Heroku の cycling など
+ Rails.logger.info("Quit(#{@worker_number}): #{msg["reason"]}")
+ send_data({:type => "quit", :message => "Bye"})
+ close_connection_after_writing
else
- $logger.warn("Unknown message type(#{@worker_number}): #{msg["type"]}")
+ Rails.logger.warn("Unknown message type(#{@worker_number}): #{msg["type"]}")
send_object({:type => "error", :message => "Unknown message type: #{msg["type"]}"})
end
end
@@ -95,27 +112,36 @@ class Receiver::Worker < DaemonSpawn::Base
secret_key = msg["secret_key"]
worker_number = msg["worker_number"]
unless secret_key == Settings.secret_key
- $logger.error("Invalid secret_key(?:#{worker_number}): #{secret_key}")
+ Rails.logger.warn("Invalid secret_key(?:#{worker_number}): \"#{secret_key}\"")
send_object({:type => "fatal", :message => "Invalid secret_key"})
close_connection_after_writing
return
end
- $connections[worker_number] = self
+ if worker_number > Settings.worker_count
+ Rails.logger.warn("Invalid worker_number: #{worker_number}, secret_key: \"#{secret_key}\"")
+ send_object({:type => "fatal", :message => "Invalid worker_number"})
+ close_connection_after_writing
+ return
+ end
+ if @@connections[worker_number]
+ @@connections[worker_number].close
+ end
+ @@connections[worker_number] = self
@worker_number = worker_number
@authorized = true
- $logger.info("Connected(#{@worker_number})")
+ Rails.logger.info("Connected(#{@worker_number})")
send_object({:type => "ok", :message => "Connected"})
send_account_all
end
def receive_unauthorized(msg)
- $logger.warn("Unauthorized(#{@worker_number}): #{msg["user_id"]}")
+ Rails.logger.warn("Unauthorized(#{@worker_number}): #{msg["user_id"]}")
# unregister
end
def receive_user(msg)
@@wq.push -> do
- $logger.debug("Received User(#{@worker_number}): #{msg["id"]}")
+ Rails.logger.debug("Received User(#{@worker_number}): #{msg["id"]}")
User.from_hash(:id => msg["id"],
:screen_name => msg["screen_name"],
:name => msg["name"],
@@ -125,38 +151,39 @@ class Receiver::Worker < DaemonSpawn::Base
end
def receive_tweet(msg)
- Rails.logger.silence do
- @@wq.push -> do
- $logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}")
- unless @@saved_tweets.include?(msg["id"])
- @@saved_tweets << msg["id"]
- if @@saved_tweets.size > 10000
- $logger.debug("Tweet ids dropped: #{@@saved_tweets.shift}")
- end
-
- Tweet.from_hash(:id => msg["id"],
- :text => msg["text"],
- :source => msg["source"],
- :tweeted_at => Time.parse(msg["tweeted_at"]),
- :user_id => msg["user_id"])
- else
- $logger.debug("Tweet already exists(#{@worker_number}): #{msg["id"]}")
+ @@wq.push -> do
+ Rails.logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}")
+ unless @@saved_tweets.include?(msg["id"])
+ @@saved_tweets << msg["id"]
+ if @@saved_tweets.size > 10000
+ Rails.logger.debug("Tweet id dropped from cache: #{@@saved_tweets.shift}")
end
+
+ Tweet.from_hash(:id => msg["id"],
+ :text => msg["text"],
+ :source => msg["source"],
+ :tweeted_at => Time.parse(msg["tweeted_at"]),
+ :user_id => msg["user_id"])
+ else
+ Rails.logger.debug("Tweet already exists(#{@worker_number}): #{msg["id"]}")
end
end
end
def receive_favorite(msg)
- @@wq.push -> do
- $logger.debug("Receive Favorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
- Favorite.from_hash(:tweet_id => msg["tweet_id"],
- :user_id => msg["user_id"])
+ @@nq.push -> do
+ Rails.logger.debug("Receive Favorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
+ f = Favorite.from_hash(:tweet_id => msg["tweet_id"],
+ :user_id => msg["user_id"])
+ if t = Tweet.cached(msg["tweet_id"])
+ t.notify_favorite
+ end
end
end
def receive_retweet(msg)
@@wq.push -> do
- $logger.debug("Receive Retweet(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
+ Rails.logger.debug("Receive Retweet(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
Retweet.from_hash(:id => msg["id"],
:tweet_id => msg["tweet_id"],
:user_id => msg["user_id"])
@@ -166,22 +193,15 @@ class Receiver::Worker < DaemonSpawn::Base
def receive_delete(msg)
@@wq.push -> do
if msg["id"]
- $logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}")
+ Rails.logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}")
Tweet.delete_from_id(msg["id"])
- Retweet.delete_from_id(msg["id"])
elsif msg["tweet_id"]
- $logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
+ Rails.logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user_id"]} => #{msg["tweet_id"]}")
Favorite.delete_from_hash(:tweet_id => msg["tweet_id"],
:user_id => msg["user_id"])
end
end
end
-
- def receive_quit(msg)
- $logger.warn("Quit(#{@worker_number}): #{msg["reason"]}")
- send_data({:type => "bye", :message => "Bye"})
- close_connection_after_writing
- end
end
class RegisterServer < EM::Connection
@@ -196,7 +216,7 @@ class Receiver::Worker < DaemonSpawn::Base
@pac.feed_each(data) do |msg|
p msg
unless msg["type"]
- $logger.error("Unknown message")
+ Rails.logger.error("Unknown message")
send_object({:type => "fatal", :message => "Unknown message"})
close_connection_after_writing
return
@@ -206,21 +226,15 @@ class Receiver::Worker < DaemonSpawn::Base
when "register"
account = Account.where(:id => msg["id"]).first
if account
- if con = $connections[account.id % Settings.worker_count]
- con.send_account(account)
- send_object({:type => "ok", :message => "Registered"})
- $logger.info("Account registered and sent")
- else
- send_object({:type => "ok", :message => "Registered but not started"})
- $logger.info("Account registered")
- end
+ DBProxyServer.send_account(account)
+ Rails.logger.info("Account registered and sent")
else
- $logger.error("Unknown account id")
+ Rails.logger.error("Unknown account id")
send_object({:type => "error", :message => "Unknown account id"})
end
close_connection_after_writing
else
- $logger.warn("Unknown register command: #{msg["type"]}")
+ Rails.logger.warn("Unknown register command: #{msg["type"]}")
end
end
end
@@ -228,22 +242,23 @@ class Receiver::Worker < DaemonSpawn::Base
def initialize(opts = {})
super(opts)
- $logger = Receiver::Logger.new(Rails.env.development? ? :debug : :info)
- $connections = {}
+ ActiveRecord::Base.logger = Rails.logger = Logger.new(STDOUT)
end
def start(args)
- $logger.info("Database Proxy Started")
+ Rails.logger.info("Database Proxy Started")
EM.run do
+ o = EM.start_server("0.0.0.0", Settings.db_proxy_port, DBProxyServer)
+ i = EM.start_unix_domain_server(Settings.register_server_path, RegisterServer)
+
stop = Proc.new do
+ EM.stop_server(o)
+ EM.stop_server(i)
EM.stop
end
Signal.trap(:INT, &stop)
Signal.trap(:QUIT, &stop)
Signal.trap(:TERM, &stop)
-
- EM.start_server("0.0.0.0", Settings.db_proxy_port, DBProxyServer)
- EM.start_unix_domain_server(Settings.register_server_path, RegisterServer)
end
end