diff options
author | Rhenium <rhenium@rhe.jp> | 2014-02-28 06:29:16 +0900 |
---|---|---|
committer | Rhenium <rhenium@rhe.jp> | 2014-02-28 06:29:16 +0900 |
commit | b2c328e84d03e73d94938f2f2bf58585feaf9625 (patch) | |
tree | e697a1b1cd65f0f224c8e9dce0b06138b898f4ef | |
parent | 8e70230529b7c9a3979bca3479e5043a0773912e (diff) | |
download | aclog-b2c328e84d03e73d94938f2f2bf58585feaf9625.tar.gz |
refactor collector-receiver
-rw-r--r-- | app/controllers/sessions_controller.rb | 10 | ||||
-rw-r--r-- | app/controllers/tweets_controller.rb | 6 | ||||
-rw-r--r-- | app/models/favorite.rb | 35 | ||||
-rw-r--r-- | app/models/notification.rb | 12 | ||||
-rw-r--r-- | app/models/retweet.rb | 39 | ||||
-rw-r--r-- | app/models/tweet.rb | 91 | ||||
-rw-r--r-- | app/models/user.rb | 2 | ||||
-rw-r--r-- | collector/connection.rb | 25 | ||||
-rw-r--r-- | collector/user_stream.rb | 155 | ||||
-rw-r--r-- | lib/aclog/receiver/collector_connection.rb | 16 |
10 files changed, 211 insertions, 180 deletions
diff --git a/app/controllers/sessions_controller.rb b/app/controllers/sessions_controller.rb index 897a6cc..068cf3c 100644 --- a/app/controllers/sessions_controller.rb +++ b/app/controllers/sessions_controller.rb @@ -7,11 +7,11 @@ class SessionsController < ApplicationController oauth_token_secret: auth["credentials"]["secret"]) account.update_connection - User.from_json(id: account.user_id, - screen_name: auth["extra"]["raw_info"]["screen_name"], - name: auth["extra"]["raw_info"]["name"], - profile_image_url: auth["extra"]["raw_info"]["profile_image_url_https"], - protected: auth["extra"]["raw_info"]["protected"]) + User.create_from_json(id: account.user_id, + screen_name: auth["extra"]["raw_info"]["screen_name"], + name: auth["extra"]["raw_info"]["name"], + profile_image_url: auth["extra"]["raw_info"]["profile_image_url_https"], + protected: auth["extra"]["raw_info"]["protected"]) session[:account] = account session[:user_id] = account.user_id diff --git a/app/controllers/tweets_controller.rb b/app/controllers/tweets_controller.rb index bf27a25..da49db1 100644 --- a/app/controllers/tweets_controller.rb +++ b/app/controllers/tweets_controller.rb @@ -68,7 +68,11 @@ class TweetsController < ApplicationController end def import - tweet = Tweet.import(params[:id], current_user.account) + if logged_in? + tweet = Tweet.import(params[:id], current_user.account.client) + else + tweet = Tweet.import(params[:id]) + end redirect_to tweet end diff --git a/app/models/favorite.rb b/app/models/favorite.rb index bc66127..d266135 100644 --- a/app/models/favorite.rb +++ b/app/models/favorite.rb @@ -2,25 +2,32 @@ class Favorite < ActiveRecord::Base belongs_to :tweet belongs_to :user - after_create do - Tweet.update_counters(self.tweet_id, favorites_count: 1, reactions_count: 1) - end - - after_destroy do - Tweet.update_counters(self.tweet_id, favorites_count: -1, reactions_count: -1) - end + def self.create_from_json(json) + tweet = Tweet.create_from_json(json[:target_object]) + user = User.create_from_json(json[:source]) - def self.from_json(json) - tweet = Tweet.from_json(json[:target_object]) - user = User.from_json(json[:source]) favorite = Favorite.new(tweet: tweet, user: user) - favorite.save! - logger.debug("Successfully created a favorite: #{favorite.id}") + + transaction do + favorite.save! + tweet.update_reactions_count(favorites_count: 1, json: json[:target_object]) + end rescue ActiveRecord::RecordNotUnique => e - logger.debug("Failed to create a favorite: #{favorite}: #{e.class}") + logger.debug("Duplicate favorite: #{favorite.user_id} => #{favorite.tweet_id}") rescue => e - logger.error("Failed to create a favorite: #{favorite}: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}") + logger.error("Failed to create a favorite: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}") ensure return favorite end + + def self.destroy_from_json(json) + transaction do + deleted_count = self.where(user_id: json[:source][:id], tweet_id: json[:target_object][:id]).delete_all + if deleted_count > 0 + Tweet.find(json[:target_object][:id]).update_reactions_count(favorites_count: -1, json: json[:target_object]) + end + end + rescue => e + logger.error("Failed to destroy a favorite: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}") + end end diff --git a/app/models/notification.rb b/app/models/notification.rb index 822e0ee..30944ca 100644 --- a/app/models/notification.rb +++ b/app/models/notification.rb @@ -1,8 +1,10 @@ class Notification - def self.notify_favorite(tweet) - if Settings.notification.favorites.include?(tweet.favorites.count) + def self.notify_favorites_count(tweet) + return unless Settings.notification.enabled + + if Settings.notification.favorites.include?(tweet.favorites_count) if tweet.user.registered? && tweet.user.account.active? && tweet.user.account.notification? - reply_favs(tweet, tweet.favorites.count) + reply_favs(tweet, tweet.favorites_count) end end end @@ -21,8 +23,8 @@ class Notification begin client = Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key, consumer_secret: Settings.notification.consumer.secret, - oauth_token: Settings.notification.accounts[cur].token, - oauth_token_secret: Settings.notification.accounts[cur].secret) + access_token: Settings.notification.accounts[cur].token, + access_token_secret: Settings.notification.accounts[cur].secret) client.update(text, in_reply_to_status_id: reply_to) rescue Twitter::Error::AlreadyPosted diff --git a/app/models/retweet.rb b/app/models/retweet.rb index 07a641a..a30aea2 100644 --- a/app/models/retweet.rb +++ b/app/models/retweet.rb @@ -2,25 +2,36 @@ class Retweet < ActiveRecord::Base belongs_to :tweet belongs_to :user - after_create do - Tweet.update_counters(self.tweet_id, retweets_count: 1, reactions_count: 1) - end - - after_destroy do - Tweet.update_counters(self.tweet_id, retweets_count: -1, reactions_count: -1) - end + def self.create_from_json(json) + tweet = Tweet.create_from_json(json[:retweeted_status]) + user = User.create_from_json(json[:user]) - def self.from_json(json) - tweet = Tweet.from_json(json[:retweeted_status]) - user = User.from_json(json[:user]) retweet = Retweet.new(id: json[:id], tweet: tweet, user: user) - retweet.save! - logger.debug("Successfully created a retweet: #{retweet.id}") + + transaction do + retweet.save! + tweet.update_reactions_count(retweets_count: 1, json: json[:retweeted_status]) + end rescue ActiveRecord::RecordNotUnique => e - logger.debug("Failed to create a retweet: #{retweet}: #{e.class}") + logger.debug("Duplicate retweet: #{retweet.id}: #{retweet.user_id} => #{retweet.tweet_id}") rescue => e - logger.error("Failed to create a retweet: #{retweet}: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}") + logger.error("Failed to create a retweet: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}") ensure return retweet end + + def self.destroy_from_json(json) + transaction do + retweet = self.where(id: json[:delete][:status][:id]).first + + if retweet + deleted_count = self.delete(retweet.id) + if deleted_count > 0 + retweet.tweet.update_reactions_count(retweets_count: -1) + end + end + end + rescue => e + logger.error("Failed to destroy a retweet: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}") + end end diff --git a/app/models/tweet.rb b/app/models/tweet.rb index be1df7c..6b114bc 100644 --- a/app/models/tweet.rb +++ b/app/models/tweet.rb @@ -39,12 +39,6 @@ class Tweet < ActiveRecord::Base "https://twitter.com/#{self.user.screen_name}/status/#{self.id}" end - def notify_favorite - if Settings.notification.enabled - Notification.notify_favorite(self) - end - end - def reply_ancestors(max_level = Float::INFINITY) nodes = [] node = self @@ -69,57 +63,68 @@ class Tweet < ActiveRecord::Base nodes.sort_by {|t| t.id } end - def self.from_json(json) - find_by(id: json[:id]) || begin - user = User.from_json(json[:user]) - tweet = Tweet.new(id: json[:id], - text: extract_entities(json), - source: json[:source], - tweeted_at: json[:created_at], - in_reply_to_id: json[:in_reply_to_status_id], - user: user) - tweet.save! - logger.debug("Successfully created a tweet: #{tweet.id}") - rescue ActiveRecord::RecordNotUnique => e - logger.debug("Failed to create a tweet: #{tweet}: #{e.class}") - rescue => e - logger.error("Failed to create a tweet: #{tweet}: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}") - ensure - return tweet + def self.create_from_json(json) + tweet = transaction do + self.find_by(id: json[:id]) || + self.create!(id: json[:id], + text: extract_entities(json), + source: json[:source], + tweeted_at: json[:created_at], + in_reply_to_id: json[:in_reply_to_status_id], + user: User.create_from_json(json[:user])) end + rescue ActiveRecord::RecordNotUnique => e + logger.debug("Duplicate tweet: #{tweet}: #{e.class}") + rescue => e + logger.error("Failed to create a tweet: #{tweet}: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}") + ensure + return tweet + end + + def self.create_from_twitter_object(obj) + t = self.create_from_json(obj.attrs) + t.update_reactions_count(json: obj.attrs) + t end - def self.from_twitter_object(obj) - transaction do - tweet = from_json(obj.attrs) - favs = [obj.favorite_count, tweet.favorites_count].max - rts = [obj.retweet_count, tweet.retweets_count].max - tweet.update!(favorites_count: favs, - retweets_count: rts, - reactions_count: favs + rts) - tweet + def self.destroy_from_json(json) + deleted_count = self.delete(json[:delete][:status][:id]) + + if deleted_count > 0 + Favorite.where(tweet_id: json[:delete][:status][:id]).delete_all + Retweet.where(tweet_id: json[:delete][:status][:id]).delete_all + true + else + false end end - def self.import(id, account = nil) - account ||= Account.random + def update_reactions_count(favorites_count: 0, retweets_count: 0, json: {}) + fav_op = favorites_count >= 0 ? "+" : "-" + rts_op = retweets_count >= 0 ? "+" : "-" + Tweet.where(id: self.id) + .update_all("favorites_count = GREATEST(favorites_count #{fav_op} #{favorites_count.abs}, #{(json[:favorite_count] || 0).to_i}), " + + "retweets_count = GREATEST(retweets_count #{rts_op} #{retweets_count.abs}, #{(json[:retweet_count] || 0).to_i}), " + + "reactions_count = favorites_count + retweets_count") + end + + def self.import(id, client = nil) + client ||= Account.random.client - tweet = self.from_twitter_object(account.client.status(id)) + st = client.status(id) + tweet = self.create_from_twitter_object(st) + tweet.update(text: extract_entities(st.attrs), + source: st.attrs[:source], + in_reply_to_id: st.attrs[:in_reply_to_status_id]) begin nt = tweet - while !nt.in_reply_to && nt.in_reply_to_id - nt = self.from_twitter_object(account.client.status(nt.in_reply_to_id)) - end + nt = self.create_from_twitter_object(client.status(nt.in_reply_to_id)) while !nt.in_reply_to && nt.in_reply_to_id rescue Twitter::Error - Rails.logger.warn($!) - return tweet + logger.warn($!) end tweet.reload - rescue Twitter::Error - Rails.logger.warn($!) - return nil end def self.filter_by_query(query) diff --git a/app/models/user.rb b/app/models/user.rb index 76d7b6b..49ed7cd 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -28,7 +28,7 @@ class User < ActiveRecord::Base key && where(key => value).order(updated_at: :desc).first || raise(ActiveRecord::RecordNotFound, "Couldn't find User with #{key}=#{value}") end - def self.from_json(json) + def self.create_from_json(json) user = where(id: json[:id]).first_or_initialize orig = user.attributes.dup diff --git a/collector/connection.rb b/collector/connection.rb index f73fbbd..7407fa4 100644 --- a/collector/connection.rb +++ b/collector/connection.rb @@ -7,8 +7,8 @@ module Aclog::Collector def initialize(logger) @logger = logger @clients = {} - @unpacker = MessagePack::Unpacker.new - @reconnect = true + @unpacker = MessagePack::Unpacker.new(symbolize_keys: true) + @exiting = false end def post_init @@ -17,7 +17,7 @@ module Aclog::Collector end def unbind - if @reconnect + if !@exiting log(:info, "reconnecting...") EM.add_timer(10) do @@ -29,36 +29,31 @@ module Aclog::Collector def receive_data(data) @unpacker.feed_each(data) do |msg| - unless msg.is_a?(Hash) && msg["type"] + unless msg.is_a?(Hash) && msg[:type] log(:warn, "unknown data: #{msg}") - @reconnect = false - close_connection return end - case msg["type"] + case msg[:type] when "ok" log(:info, "connection established") when "error" log(:error, "error: #{msg}") when "fatal" log(:fatal, "fatal: #{msg}") - @reconnect = false - close_connection when "account" - account_id = msg["id"] + account_id = msg[:id] if @clients[account_id] @clients[account_id].update(msg) + log(:info, "updated: #{account_id}") else - stream = UserStream.new(@logger, msg) do |event, data| - send_object(data.merge(type: event)) - end + stream = UserStream.new(@logger, msg, ->(event, data) { send_object(data.merge(type: event)) }) stream.start @clients[account_id] = stream log(:info, "registered: #{account_id}") end when "stop" - account_id = msg["id"] + account_id = msg[:id] client = @clients[account_id] if client client.stop @@ -72,7 +67,7 @@ module Aclog::Collector end def quit - @reconnect = false + @exiting = true send_object(type: "quit", reason: "stop") @clients.values.each(&:stop) end diff --git a/collector/user_stream.rb b/collector/user_stream.rb index 7886694..f66bd91 100644 --- a/collector/user_stream.rb +++ b/collector/user_stream.rb @@ -5,11 +5,11 @@ require "./settings" module Aclog module Collector class UserStream - def initialize(logger, msg, &blk) + def initialize(logger, msg, callback) @logger = logger - @user_id = msg["user_id"] - @account_id = msg["id"] - @callback = blk + @user_id = msg[:user_id] + @account_id = msg[:id] + @callback = callback prepare_client(msg) end @@ -18,93 +18,61 @@ module Aclog end def prepare_client(msg) - client = EM::Twitter::Client.new(make_opts(msg)) + client = EM::Twitter::Client.new(client_opts(msg)) + client.on_error do |message| log(:error, "Unknown error: #{message}") end - client.on_enhance_your_calm do - log(:warn, "Enhance your calm") - end - client.on_no_data_received do log(:warn, "No data received") end - client.on_close do - log(:info, "disconnect") - end - client.on_reconnect do |timeout, retries| log(:info, "Reconnected: #{retries}") end client.on_max_reconnects do |timeout, retries| log(:warn, "Reached max reconnects: #{retries}") - stop + self.stop end client.on_unauthorized do log(:warn, "Unauthorized") callback(:unauthorized, id: @account_id, user_id: @user_id) - stop + self.stop end client.on_service_unavailable do - # account deleted? + # Twitter account deleted? log(:warn, "Service unavailable") - stop + self.stop end client.each do |item| begin - hash = Yajl::Parser.parse(item, symbolize_keys: true) + json = Yajl::Parser.parse(item, symbolize_keys: true) rescue Yajl::ParseError log(:warn, "JSON parse error: #{item}") next end - if hash[:warning] - log(:warn, "warning: #{hash[:warning]}") - elsif hash[:limit] - log(:warn, "limit: #{hash[:limit][:track]}") - elsif hash[:delete] - if d = hash[:delete][:status] - log(:debug, "delete: #{hash[:delete][:status]}") - callback(:delete, d) - end - elsif hash[:event] - case hash[:event] - when "favorite" - log(:debug, "favorite: #{hash[:source][:id]} => #{hash[:target_object][:id]}") - callback(:favorite, - source: reduce_user(hash[:source]), - target_object: reduce_tweet(hash[:target_object])) - when "unfavorite" - log(:debug, "unfavorite: #{hash[:source][:id]} => #{hash[:target_object][:id]}") - callback(:unfavorite, - source: reduce_user(hash[:source]), - target_object: reduce_tweet(hash[:target_object])) - end - elsif hash[:user] - if hash[:retweeted_status] - if hash[:retweeted_status][:user][:id] == @user_id || hash[:user][:id] == @user_id - log(:debug, "retweet: #{hash[:user][:id]} => #{hash[:retweeted_status][:id]}") - callback(:retweet, - id: hash[:id], - user: reduce_user(hash[:user]), - retweeted_status: reduce_tweet(hash[:retweeted_status])) - end - elsif hash[:user][:id] == @user_id - log(:debug, "tweet: #{hash[:user][:id]} => #{hash[:id]}") - callback(:tweet, reduce_tweet(hash)) - end - elsif hash[:friends] - log(:debug, "friends: #{hash[:friends].size}") - elsif hash[:scrub_geo] - log(:debug, "scrub_geo: #{hash}") + if json[:delete] && json[:delete][:status] + on_delete(json) + elsif json[:event] == "favorite" + on_favorite(json) + elsif json[:event] == "unfavorite" + on_unfavorite(json) + elsif json[:user] && json[:retweeted_status] + on_retweet(json) + elsif json[:user] + on_tweet(json) + elsif json[:friends] + log(:debug, "friends: #{json[:friends].size}") + elsif json[:warning] + log(:warn, "warning: #{json[:warning]}") else - log(:info, "Unknown streaming data: #{hash}") + # scrub_geo, limit, unknown message end end @client = client @@ -116,9 +84,8 @@ module Aclog end def update(hash) - opts = make_opts(hash) + opts = client_opts(hash) if opts[:oauth][:token] != @client.options[:oauth][:token] - log(:info, "update") @client.connection.update(opts) end end @@ -128,37 +95,79 @@ module Aclog end private - def log(level, message) - @logger.send(level, "[USERSTREAM:##{@account_id}:#{@user_id}] #{message}") + def on_tweet(json) + log(:debug, "tweet: #{json[:user][:id]} => #{json[:id]}") + callback(:tweet, reduce_tweet(json)) + end + + def on_retweet(json) + log(:debug, "retweet: #{json[:user][:id]} => #{json[:retweeted_status][:id]}") + callback(:retweet, + id: json[:id], + user: reduce_user(json[:user]), + retweeted_status: reduce_tweet(json[:retweeted_status])) end - def make_opts(msg) - { host: "userstream.twitter.com", + def on_favorite(json) + log(:debug, "favorite: #{json[:source][:id]} => #{json[:target_object][:id]}") + callback(:favorite, + source: reduce_user(json[:source]), + target_object: reduce_tweet(json[:target_object])) + end + + def on_unfavorite(json) + log(:debug, "unfavorite: #{json[:source][:id]} => #{json[:target_object][:id]}") + callback(:unfavorite, + source: reduce_user(json[:source]), + target_object: reduce_tweet(json[:target_object])) + end + + def on_delete(json) + log(:debug, "delete: #{json[:delete][:status]}") + callback(:delete, json) + end + + def client_opts(msg) + { + method: :get, + host: "userstream.twitter.com", path: "/1.1/user.json", params: { with: "user" }, - oauth: { consumer_key: msg["consumer_key"], - consumer_secret: msg["consumer_secret"], - token: msg["oauth_token"], - token_secret: msg["oauth_token_secret"] }, - method: :get } + oauth: { + consumer_key: msg[:consumer_key], + consumer_secret: msg[:consumer_secret], + token: msg[:oauth_token], + token_secret: msg[:oauth_token_secret] + } + } end def reduce_user(user) - { id: user[:id], + { + id: user[:id], screen_name: user[:screen_name], name: user[:name], profile_image_url: user[:profile_image_url_https], - protected: user[:protected] } + protected: user[:protected] + } end def reduce_tweet(status) - { id: status[:id], + { + id: status[:id], text: status[:text], entities: status[:entities], source: status[:source], created_at: status[:created_at], in_reply_to_status_id: status[:in_reply_to_status_id], - user: reduce_user(status[:user]) } + favorite_count: status[:favorite_count], + retweet_count: status[:retweet_count], + user: reduce_user(status[:user]) + } + end + + def log(level, message) + @logger.__send__(level, "[USERSTREAM:##{@account_id}:#{@user_id}] #{message}") end end end diff --git a/lib/aclog/receiver/collector_connection.rb b/lib/aclog/receiver/collector_connection.rb index c1258de..9a6128f 100644 --- a/lib/aclog/receiver/collector_connection.rb +++ b/lib/aclog/receiver/collector_connection.rb @@ -62,30 +62,28 @@ module Aclog when "tweet" @channel << -> { log(:debug, "receive tweet: #{msg[:id]}") - Tweet.from_json(msg) + Tweet.create_from_json(msg) } when "favorite" @channel << -> { log(:debug, "receive favorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}") - if f = Favorite.from_json(msg) - f.tweet.notify_favorite - end + f = Favorite.create_from_json(msg) + Notification.notify_favorites_count(f.tweet) } when "unfavorite" @channel << -> { log(:debug, "receive unfavorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}") - Favorite.where(user_id: msg[:source][:id], tweet_id: msg[:target_object][:id]).destroy_all + Favorite.destroy_from_json(msg) } when "retweet" @channel << -> { log(:debug, "receive retweet: #{msg[:user][:id]} => #{msg[:retweeted_status][:id]}") - Retweet.from_json(msg) + Retweet.create_from_json(msg) } when "delete" @channel << -> { - log(:debug, "receive delete: #{msg[:id]}") - Tweet.where(id: msg[:id]).destroy_all - Retweet.where(id: msg[:id]).destroy_all + log(:debug, "receive delete: #{msg[:delete][:status][:id]}") + Tweet.destroy_from_json(msg) || Retweet.destroy_from_json(msg) } when "quit" log(:info, "receive quit: #{msg[:reason]}") |