aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRhenium <rhenium@rhe.jp>2014-02-28 06:29:16 +0900
committerRhenium <rhenium@rhe.jp>2014-02-28 06:29:16 +0900
commitb2c328e84d03e73d94938f2f2bf58585feaf9625 (patch)
treee697a1b1cd65f0f224c8e9dce0b06138b898f4ef
parent8e70230529b7c9a3979bca3479e5043a0773912e (diff)
downloadaclog-b2c328e84d03e73d94938f2f2bf58585feaf9625.tar.gz
refactor collector-receiver
-rw-r--r--app/controllers/sessions_controller.rb10
-rw-r--r--app/controllers/tweets_controller.rb6
-rw-r--r--app/models/favorite.rb35
-rw-r--r--app/models/notification.rb12
-rw-r--r--app/models/retweet.rb39
-rw-r--r--app/models/tweet.rb91
-rw-r--r--app/models/user.rb2
-rw-r--r--collector/connection.rb25
-rw-r--r--collector/user_stream.rb155
-rw-r--r--lib/aclog/receiver/collector_connection.rb16
10 files changed, 211 insertions, 180 deletions
diff --git a/app/controllers/sessions_controller.rb b/app/controllers/sessions_controller.rb
index 897a6cc..068cf3c 100644
--- a/app/controllers/sessions_controller.rb
+++ b/app/controllers/sessions_controller.rb
@@ -7,11 +7,11 @@ class SessionsController < ApplicationController
oauth_token_secret: auth["credentials"]["secret"])
account.update_connection
- User.from_json(id: account.user_id,
- screen_name: auth["extra"]["raw_info"]["screen_name"],
- name: auth["extra"]["raw_info"]["name"],
- profile_image_url: auth["extra"]["raw_info"]["profile_image_url_https"],
- protected: auth["extra"]["raw_info"]["protected"])
+ User.create_from_json(id: account.user_id,
+ screen_name: auth["extra"]["raw_info"]["screen_name"],
+ name: auth["extra"]["raw_info"]["name"],
+ profile_image_url: auth["extra"]["raw_info"]["profile_image_url_https"],
+ protected: auth["extra"]["raw_info"]["protected"])
session[:account] = account
session[:user_id] = account.user_id
diff --git a/app/controllers/tweets_controller.rb b/app/controllers/tweets_controller.rb
index bf27a25..da49db1 100644
--- a/app/controllers/tweets_controller.rb
+++ b/app/controllers/tweets_controller.rb
@@ -68,7 +68,11 @@ class TweetsController < ApplicationController
end
def import
- tweet = Tweet.import(params[:id], current_user.account)
+ if logged_in?
+ tweet = Tweet.import(params[:id], current_user.account.client)
+ else
+ tweet = Tweet.import(params[:id])
+ end
redirect_to tweet
end
diff --git a/app/models/favorite.rb b/app/models/favorite.rb
index bc66127..d266135 100644
--- a/app/models/favorite.rb
+++ b/app/models/favorite.rb
@@ -2,25 +2,32 @@ class Favorite < ActiveRecord::Base
belongs_to :tweet
belongs_to :user
- after_create do
- Tweet.update_counters(self.tweet_id, favorites_count: 1, reactions_count: 1)
- end
-
- after_destroy do
- Tweet.update_counters(self.tweet_id, favorites_count: -1, reactions_count: -1)
- end
+ def self.create_from_json(json)
+ tweet = Tweet.create_from_json(json[:target_object])
+ user = User.create_from_json(json[:source])
- def self.from_json(json)
- tweet = Tweet.from_json(json[:target_object])
- user = User.from_json(json[:source])
favorite = Favorite.new(tweet: tweet, user: user)
- favorite.save!
- logger.debug("Successfully created a favorite: #{favorite.id}")
+
+ transaction do
+ favorite.save!
+ tweet.update_reactions_count(favorites_count: 1, json: json[:target_object])
+ end
rescue ActiveRecord::RecordNotUnique => e
- logger.debug("Failed to create a favorite: #{favorite}: #{e.class}")
+ logger.debug("Duplicate favorite: #{favorite.user_id} => #{favorite.tweet_id}")
rescue => e
- logger.error("Failed to create a favorite: #{favorite}: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}")
+ logger.error("Failed to create a favorite: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}")
ensure
return favorite
end
+
+ def self.destroy_from_json(json)
+ transaction do
+ deleted_count = self.where(user_id: json[:source][:id], tweet_id: json[:target_object][:id]).delete_all
+ if deleted_count > 0
+ Tweet.find(json[:target_object][:id]).update_reactions_count(favorites_count: -1, json: json[:target_object])
+ end
+ end
+ rescue => e
+ logger.error("Failed to destroy a favorite: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}")
+ end
end
diff --git a/app/models/notification.rb b/app/models/notification.rb
index 822e0ee..30944ca 100644
--- a/app/models/notification.rb
+++ b/app/models/notification.rb
@@ -1,8 +1,10 @@
class Notification
- def self.notify_favorite(tweet)
- if Settings.notification.favorites.include?(tweet.favorites.count)
+ def self.notify_favorites_count(tweet)
+ return unless Settings.notification.enabled
+
+ if Settings.notification.favorites.include?(tweet.favorites_count)
if tweet.user.registered? && tweet.user.account.active? && tweet.user.account.notification?
- reply_favs(tweet, tweet.favorites.count)
+ reply_favs(tweet, tweet.favorites_count)
end
end
end
@@ -21,8 +23,8 @@ class Notification
begin
client = Twitter::REST::Client.new(consumer_key: Settings.notification.consumer.key,
consumer_secret: Settings.notification.consumer.secret,
- oauth_token: Settings.notification.accounts[cur].token,
- oauth_token_secret: Settings.notification.accounts[cur].secret)
+ access_token: Settings.notification.accounts[cur].token,
+ access_token_secret: Settings.notification.accounts[cur].secret)
client.update(text, in_reply_to_status_id: reply_to)
rescue Twitter::Error::AlreadyPosted
diff --git a/app/models/retweet.rb b/app/models/retweet.rb
index 07a641a..a30aea2 100644
--- a/app/models/retweet.rb
+++ b/app/models/retweet.rb
@@ -2,25 +2,36 @@ class Retweet < ActiveRecord::Base
belongs_to :tweet
belongs_to :user
- after_create do
- Tweet.update_counters(self.tweet_id, retweets_count: 1, reactions_count: 1)
- end
-
- after_destroy do
- Tweet.update_counters(self.tweet_id, retweets_count: -1, reactions_count: -1)
- end
+ def self.create_from_json(json)
+ tweet = Tweet.create_from_json(json[:retweeted_status])
+ user = User.create_from_json(json[:user])
- def self.from_json(json)
- tweet = Tweet.from_json(json[:retweeted_status])
- user = User.from_json(json[:user])
retweet = Retweet.new(id: json[:id], tweet: tweet, user: user)
- retweet.save!
- logger.debug("Successfully created a retweet: #{retweet.id}")
+
+ transaction do
+ retweet.save!
+ tweet.update_reactions_count(retweets_count: 1, json: json[:retweeted_status])
+ end
rescue ActiveRecord::RecordNotUnique => e
- logger.debug("Failed to create a retweet: #{retweet}: #{e.class}")
+ logger.debug("Duplicate retweet: #{retweet.id}: #{retweet.user_id} => #{retweet.tweet_id}")
rescue => e
- logger.error("Failed to create a retweet: #{retweet}: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}")
+ logger.error("Failed to create a retweet: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}")
ensure
return retweet
end
+
+ def self.destroy_from_json(json)
+ transaction do
+ retweet = self.where(id: json[:delete][:status][:id]).first
+
+ if retweet
+ deleted_count = self.delete(retweet.id)
+ if deleted_count > 0
+ retweet.tweet.update_reactions_count(retweets_count: -1)
+ end
+ end
+ end
+ rescue => e
+ logger.error("Failed to destroy a retweet: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}")
+ end
end
diff --git a/app/models/tweet.rb b/app/models/tweet.rb
index be1df7c..6b114bc 100644
--- a/app/models/tweet.rb
+++ b/app/models/tweet.rb
@@ -39,12 +39,6 @@ class Tweet < ActiveRecord::Base
"https://twitter.com/#{self.user.screen_name}/status/#{self.id}"
end
- def notify_favorite
- if Settings.notification.enabled
- Notification.notify_favorite(self)
- end
- end
-
def reply_ancestors(max_level = Float::INFINITY)
nodes = []
node = self
@@ -69,57 +63,68 @@ class Tweet < ActiveRecord::Base
nodes.sort_by {|t| t.id }
end
- def self.from_json(json)
- find_by(id: json[:id]) || begin
- user = User.from_json(json[:user])
- tweet = Tweet.new(id: json[:id],
- text: extract_entities(json),
- source: json[:source],
- tweeted_at: json[:created_at],
- in_reply_to_id: json[:in_reply_to_status_id],
- user: user)
- tweet.save!
- logger.debug("Successfully created a tweet: #{tweet.id}")
- rescue ActiveRecord::RecordNotUnique => e
- logger.debug("Failed to create a tweet: #{tweet}: #{e.class}")
- rescue => e
- logger.error("Failed to create a tweet: #{tweet}: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}")
- ensure
- return tweet
+ def self.create_from_json(json)
+ tweet = transaction do
+ self.find_by(id: json[:id]) ||
+ self.create!(id: json[:id],
+ text: extract_entities(json),
+ source: json[:source],
+ tweeted_at: json[:created_at],
+ in_reply_to_id: json[:in_reply_to_status_id],
+ user: User.create_from_json(json[:user]))
end
+ rescue ActiveRecord::RecordNotUnique => e
+ logger.debug("Duplicate tweet: #{tweet}: #{e.class}")
+ rescue => e
+ logger.error("Failed to create a tweet: #{tweet}: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}")
+ ensure
+ return tweet
+ end
+
+ def self.create_from_twitter_object(obj)
+ t = self.create_from_json(obj.attrs)
+ t.update_reactions_count(json: obj.attrs)
+ t
end
- def self.from_twitter_object(obj)
- transaction do
- tweet = from_json(obj.attrs)
- favs = [obj.favorite_count, tweet.favorites_count].max
- rts = [obj.retweet_count, tweet.retweets_count].max
- tweet.update!(favorites_count: favs,
- retweets_count: rts,
- reactions_count: favs + rts)
- tweet
+ def self.destroy_from_json(json)
+ deleted_count = self.delete(json[:delete][:status][:id])
+
+ if deleted_count > 0
+ Favorite.where(tweet_id: json[:delete][:status][:id]).delete_all
+ Retweet.where(tweet_id: json[:delete][:status][:id]).delete_all
+ true
+ else
+ false
end
end
- def self.import(id, account = nil)
- account ||= Account.random
+ def update_reactions_count(favorites_count: 0, retweets_count: 0, json: {})
+ fav_op = favorites_count >= 0 ? "+" : "-"
+ rts_op = retweets_count >= 0 ? "+" : "-"
+ Tweet.where(id: self.id)
+ .update_all("favorites_count = GREATEST(favorites_count #{fav_op} #{favorites_count.abs}, #{(json[:favorite_count] || 0).to_i}), " +
+ "retweets_count = GREATEST(retweets_count #{rts_op} #{retweets_count.abs}, #{(json[:retweet_count] || 0).to_i}), " +
+ "reactions_count = favorites_count + retweets_count")
+ end
+
+ def self.import(id, client = nil)
+ client ||= Account.random.client
- tweet = self.from_twitter_object(account.client.status(id))
+ st = client.status(id)
+ tweet = self.create_from_twitter_object(st)
+ tweet.update(text: extract_entities(st.attrs),
+ source: st.attrs[:source],
+ in_reply_to_id: st.attrs[:in_reply_to_status_id])
begin
nt = tweet
- while !nt.in_reply_to && nt.in_reply_to_id
- nt = self.from_twitter_object(account.client.status(nt.in_reply_to_id))
- end
+ nt = self.create_from_twitter_object(client.status(nt.in_reply_to_id)) while !nt.in_reply_to && nt.in_reply_to_id
rescue Twitter::Error
- Rails.logger.warn($!)
- return tweet
+ logger.warn($!)
end
tweet.reload
- rescue Twitter::Error
- Rails.logger.warn($!)
- return nil
end
def self.filter_by_query(query)
diff --git a/app/models/user.rb b/app/models/user.rb
index 76d7b6b..49ed7cd 100644
--- a/app/models/user.rb
+++ b/app/models/user.rb
@@ -28,7 +28,7 @@ class User < ActiveRecord::Base
key && where(key => value).order(updated_at: :desc).first || raise(ActiveRecord::RecordNotFound, "Couldn't find User with #{key}=#{value}")
end
- def self.from_json(json)
+ def self.create_from_json(json)
user = where(id: json[:id]).first_or_initialize
orig = user.attributes.dup
diff --git a/collector/connection.rb b/collector/connection.rb
index f73fbbd..7407fa4 100644
--- a/collector/connection.rb
+++ b/collector/connection.rb
@@ -7,8 +7,8 @@ module Aclog::Collector
def initialize(logger)
@logger = logger
@clients = {}
- @unpacker = MessagePack::Unpacker.new
- @reconnect = true
+ @unpacker = MessagePack::Unpacker.new(symbolize_keys: true)
+ @exiting = false
end
def post_init
@@ -17,7 +17,7 @@ module Aclog::Collector
end
def unbind
- if @reconnect
+ if !@exiting
log(:info, "reconnecting...")
EM.add_timer(10) do
@@ -29,36 +29,31 @@ module Aclog::Collector
def receive_data(data)
@unpacker.feed_each(data) do |msg|
- unless msg.is_a?(Hash) && msg["type"]
+ unless msg.is_a?(Hash) && msg[:type]
log(:warn, "unknown data: #{msg}")
- @reconnect = false
- close_connection
return
end
- case msg["type"]
+ case msg[:type]
when "ok"
log(:info, "connection established")
when "error"
log(:error, "error: #{msg}")
when "fatal"
log(:fatal, "fatal: #{msg}")
- @reconnect = false
- close_connection
when "account"
- account_id = msg["id"]
+ account_id = msg[:id]
if @clients[account_id]
@clients[account_id].update(msg)
+ log(:info, "updated: #{account_id}")
else
- stream = UserStream.new(@logger, msg) do |event, data|
- send_object(data.merge(type: event))
- end
+ stream = UserStream.new(@logger, msg, ->(event, data) { send_object(data.merge(type: event)) })
stream.start
@clients[account_id] = stream
log(:info, "registered: #{account_id}")
end
when "stop"
- account_id = msg["id"]
+ account_id = msg[:id]
client = @clients[account_id]
if client
client.stop
@@ -72,7 +67,7 @@ module Aclog::Collector
end
def quit
- @reconnect = false
+ @exiting = true
send_object(type: "quit", reason: "stop")
@clients.values.each(&:stop)
end
diff --git a/collector/user_stream.rb b/collector/user_stream.rb
index 7886694..f66bd91 100644
--- a/collector/user_stream.rb
+++ b/collector/user_stream.rb
@@ -5,11 +5,11 @@ require "./settings"
module Aclog
module Collector
class UserStream
- def initialize(logger, msg, &blk)
+ def initialize(logger, msg, callback)
@logger = logger
- @user_id = msg["user_id"]
- @account_id = msg["id"]
- @callback = blk
+ @user_id = msg[:user_id]
+ @account_id = msg[:id]
+ @callback = callback
prepare_client(msg)
end
@@ -18,93 +18,61 @@ module Aclog
end
def prepare_client(msg)
- client = EM::Twitter::Client.new(make_opts(msg))
+ client = EM::Twitter::Client.new(client_opts(msg))
+
client.on_error do |message|
log(:error, "Unknown error: #{message}")
end
- client.on_enhance_your_calm do
- log(:warn, "Enhance your calm")
- end
-
client.on_no_data_received do
log(:warn, "No data received")
end
- client.on_close do
- log(:info, "disconnect")
- end
-
client.on_reconnect do |timeout, retries|
log(:info, "Reconnected: #{retries}")
end
client.on_max_reconnects do |timeout, retries|
log(:warn, "Reached max reconnects: #{retries}")
- stop
+ self.stop
end
client.on_unauthorized do
log(:warn, "Unauthorized")
callback(:unauthorized, id: @account_id, user_id: @user_id)
- stop
+ self.stop
end
client.on_service_unavailable do
- # account deleted?
+ # Twitter account deleted?
log(:warn, "Service unavailable")
- stop
+ self.stop
end
client.each do |item|
begin
- hash = Yajl::Parser.parse(item, symbolize_keys: true)
+ json = Yajl::Parser.parse(item, symbolize_keys: true)
rescue Yajl::ParseError
log(:warn, "JSON parse error: #{item}")
next
end
- if hash[:warning]
- log(:warn, "warning: #{hash[:warning]}")
- elsif hash[:limit]
- log(:warn, "limit: #{hash[:limit][:track]}")
- elsif hash[:delete]
- if d = hash[:delete][:status]
- log(:debug, "delete: #{hash[:delete][:status]}")
- callback(:delete, d)
- end
- elsif hash[:event]
- case hash[:event]
- when "favorite"
- log(:debug, "favorite: #{hash[:source][:id]} => #{hash[:target_object][:id]}")
- callback(:favorite,
- source: reduce_user(hash[:source]),
- target_object: reduce_tweet(hash[:target_object]))
- when "unfavorite"
- log(:debug, "unfavorite: #{hash[:source][:id]} => #{hash[:target_object][:id]}")
- callback(:unfavorite,
- source: reduce_user(hash[:source]),
- target_object: reduce_tweet(hash[:target_object]))
- end
- elsif hash[:user]
- if hash[:retweeted_status]
- if hash[:retweeted_status][:user][:id] == @user_id || hash[:user][:id] == @user_id
- log(:debug, "retweet: #{hash[:user][:id]} => #{hash[:retweeted_status][:id]}")
- callback(:retweet,
- id: hash[:id],
- user: reduce_user(hash[:user]),
- retweeted_status: reduce_tweet(hash[:retweeted_status]))
- end
- elsif hash[:user][:id] == @user_id
- log(:debug, "tweet: #{hash[:user][:id]} => #{hash[:id]}")
- callback(:tweet, reduce_tweet(hash))
- end
- elsif hash[:friends]
- log(:debug, "friends: #{hash[:friends].size}")
- elsif hash[:scrub_geo]
- log(:debug, "scrub_geo: #{hash}")
+ if json[:delete] && json[:delete][:status]
+ on_delete(json)
+ elsif json[:event] == "favorite"
+ on_favorite(json)
+ elsif json[:event] == "unfavorite"
+ on_unfavorite(json)
+ elsif json[:user] && json[:retweeted_status]
+ on_retweet(json)
+ elsif json[:user]
+ on_tweet(json)
+ elsif json[:friends]
+ log(:debug, "friends: #{json[:friends].size}")
+ elsif json[:warning]
+ log(:warn, "warning: #{json[:warning]}")
else
- log(:info, "Unknown streaming data: #{hash}")
+ # scrub_geo, limit, unknown message
end
end
@client = client
@@ -116,9 +84,8 @@ module Aclog
end
def update(hash)
- opts = make_opts(hash)
+ opts = client_opts(hash)
if opts[:oauth][:token] != @client.options[:oauth][:token]
- log(:info, "update")
@client.connection.update(opts)
end
end
@@ -128,37 +95,79 @@ module Aclog
end
private
- def log(level, message)
- @logger.send(level, "[USERSTREAM:##{@account_id}:#{@user_id}] #{message}")
+ def on_tweet(json)
+ log(:debug, "tweet: #{json[:user][:id]} => #{json[:id]}")
+ callback(:tweet, reduce_tweet(json))
+ end
+
+ def on_retweet(json)
+ log(:debug, "retweet: #{json[:user][:id]} => #{json[:retweeted_status][:id]}")
+ callback(:retweet,
+ id: json[:id],
+ user: reduce_user(json[:user]),
+ retweeted_status: reduce_tweet(json[:retweeted_status]))
end
- def make_opts(msg)
- { host: "userstream.twitter.com",
+ def on_favorite(json)
+ log(:debug, "favorite: #{json[:source][:id]} => #{json[:target_object][:id]}")
+ callback(:favorite,
+ source: reduce_user(json[:source]),
+ target_object: reduce_tweet(json[:target_object]))
+ end
+
+ def on_unfavorite(json)
+ log(:debug, "unfavorite: #{json[:source][:id]} => #{json[:target_object][:id]}")
+ callback(:unfavorite,
+ source: reduce_user(json[:source]),
+ target_object: reduce_tweet(json[:target_object]))
+ end
+
+ def on_delete(json)
+ log(:debug, "delete: #{json[:delete][:status]}")
+ callback(:delete, json)
+ end
+
+ def client_opts(msg)
+ {
+ method: :get,
+ host: "userstream.twitter.com",
path: "/1.1/user.json",
params: { with: "user" },
- oauth: { consumer_key: msg["consumer_key"],
- consumer_secret: msg["consumer_secret"],
- token: msg["oauth_token"],
- token_secret: msg["oauth_token_secret"] },
- method: :get }
+ oauth: {
+ consumer_key: msg[:consumer_key],
+ consumer_secret: msg[:consumer_secret],
+ token: msg[:oauth_token],
+ token_secret: msg[:oauth_token_secret]
+ }
+ }
end
def reduce_user(user)
- { id: user[:id],
+ {
+ id: user[:id],
screen_name: user[:screen_name],
name: user[:name],
profile_image_url: user[:profile_image_url_https],
- protected: user[:protected] }
+ protected: user[:protected]
+ }
end
def reduce_tweet(status)
- { id: status[:id],
+ {
+ id: status[:id],
text: status[:text],
entities: status[:entities],
source: status[:source],
created_at: status[:created_at],
in_reply_to_status_id: status[:in_reply_to_status_id],
- user: reduce_user(status[:user]) }
+ favorite_count: status[:favorite_count],
+ retweet_count: status[:retweet_count],
+ user: reduce_user(status[:user])
+ }
+ end
+
+ def log(level, message)
+ @logger.__send__(level, "[USERSTREAM:##{@account_id}:#{@user_id}] #{message}")
end
end
end
diff --git a/lib/aclog/receiver/collector_connection.rb b/lib/aclog/receiver/collector_connection.rb
index c1258de..9a6128f 100644
--- a/lib/aclog/receiver/collector_connection.rb
+++ b/lib/aclog/receiver/collector_connection.rb
@@ -62,30 +62,28 @@ module Aclog
when "tweet"
@channel << -> {
log(:debug, "receive tweet: #{msg[:id]}")
- Tweet.from_json(msg)
+ Tweet.create_from_json(msg)
}
when "favorite"
@channel << -> {
log(:debug, "receive favorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
- if f = Favorite.from_json(msg)
- f.tweet.notify_favorite
- end
+ f = Favorite.create_from_json(msg)
+ Notification.notify_favorites_count(f.tweet)
}
when "unfavorite"
@channel << -> {
log(:debug, "receive unfavorite: #{msg[:source][:id]} => #{msg[:target_object][:id]}")
- Favorite.where(user_id: msg[:source][:id], tweet_id: msg[:target_object][:id]).destroy_all
+ Favorite.destroy_from_json(msg)
}
when "retweet"
@channel << -> {
log(:debug, "receive retweet: #{msg[:user][:id]} => #{msg[:retweeted_status][:id]}")
- Retweet.from_json(msg)
+ Retweet.create_from_json(msg)
}
when "delete"
@channel << -> {
- log(:debug, "receive delete: #{msg[:id]}")
- Tweet.where(id: msg[:id]).destroy_all
- Retweet.where(id: msg[:id]).destroy_all
+ log(:debug, "receive delete: #{msg[:delete][:status][:id]}")
+ Tweet.destroy_from_json(msg) || Retweet.destroy_from_json(msg)
}
when "quit"
log(:info, "receive quit: #{msg[:reason]}")