diff options
author | rhenium <re4k@re4k.info> | 2013-05-11 08:56:02 +0900 |
---|---|---|
committer | rhenium <re4k@re4k.info> | 2013-05-11 08:56:02 +0900 |
commit | 73fc0f1c0fbcc71743df2b8fba3c155986cf4cf9 (patch) | |
tree | 4e0694e4b15efd510e1461f578f15521ff3d8d14 /collector | |
parent | bd35c7dad0c8481f508e21815faa1f2d41287465 (diff) | |
download | aclog-73fc0f1c0fbcc71743df2b8fba3c155986cf4cf9.tar.gz |
update collector
Diffstat (limited to 'collector')
-rw-r--r-- | collector/connection.rb | 81 | ||||
-rw-r--r-- | collector/helper.rb | 45 | ||||
-rw-r--r-- | collector/logger.rb | 44 | ||||
-rw-r--r-- | collector/settings.rb | 6 | ||||
-rwxr-xr-x | collector/start.rb | 2 | ||||
-rw-r--r-- | collector/stream.rb | 184 | ||||
-rw-r--r-- | collector/worker.rb | 367 |
7 files changed, 333 insertions, 396 deletions
diff --git a/collector/connection.rb b/collector/connection.rb new file mode 100644 index 0000000..8a06b3f --- /dev/null +++ b/collector/connection.rb @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +require "msgpack" +require "./settings" +require "./stream" + +module Aclog + module Collector + class Connection < EM::Connection + def initialize(logger) + @logger = logger + @clients = {} + @pac = MessagePack::Unpacker.new + @connected = false + end + + def post_init + send_object( + type: "init", + secret_key: Settings.secret_key, + worker_number: Settings.worker_number + ) + end + + def unbind + @logger.info("Connection closed") if @connected + EM.add_timer(10) do + @connected = false + reconnect(Settings.receiver_host, Settings.receiver_port) + post_init + end + end + + # Server + def receive_data(data) + @pac.feed_each(data) do |msg| + if not msg.is_a?(Hash) or not msg["type"] + @logger.warn("Unknown data: #{msg}") + close_connection_after_writing + return + end + + case msg["type"] + when "ok" + @connected = true + @logger.info("Connected with server") + when "error" + @logger.info("error: #{msg["message"]}") + when "fatal" + @logger.info("fatal: #{msg["message"]}") + close_connection + when "bye" + close_connection + when "account" + account_id = msg["id"] + if not @clients[account_id] + user_connection = Aclog::Collector::Stream.new(@logger, method(:send_object), msg) + user_connection.start + @clients[account_id] = user_connection + else + @clients[account_id].update(msg) + end + else + @logger.info("Unknown message: #{msg}") + end + end + end + + def quit + send_object(type: "quit", reason: "stop") + @clients.values.map(&:stop) + end + + private + def send_object(data) + send_data(data.to_msgpack) + end + end + end +end + + diff --git a/collector/helper.rb b/collector/helper.rb new file mode 100644 index 0000000..30bdb9f --- /dev/null +++ b/collector/helper.rb @@ -0,0 +1,45 @@ +# -*- encoding: utf-8 -*- + +module Aclog + module Collector + module Helper + def format_text(status) + text = status[:text] + entities = status[:entities].map{|k, v| v.map{|n| n.update(type: k)}}.flatten.sort_by{|entity| entity[:indices].first} + + result = "" + last_index = entities.inject(0) do |last_index, entity| + result << text[last_index...entity[:indices].first] + case entity[:type] + when :urls, :media + result << "<url:#{escape_colon(entity[:expanded_url])}:#{escape_colon(entity[:display_url])}>" + when :hashtags + result << "<hashtag:#{escape_colon(entity[:text])}>" + when :user_mentions + result << "<mention:#{escape_colon(entity[:screen_name])}>" + when :symbols + result << "<symbol:#{escape_colon(entity[:text])}>" + end + + entity[:indices].last + end + result << text[last_index..-1] + + result + end + + def format_source(status) + if status[:source].include?("<a") + url, name = status[:source].scan(/<a href="(.+?)" rel="nofollow">(.+?)<\/a>/).flatten + "<url:#{escape_colon(url)}:#{escape_colon(name)}>" + else + # web, txt, .. + status[:source] + end + end + + private + def escape_colon(str); str.gsub(":", "%3A").gsub("<", "%3C").gsub(">", "%3E") end + end + end +end diff --git a/collector/logger.rb b/collector/logger.rb deleted file mode 100644 index eb09894..0000000 --- a/collector/logger.rb +++ /dev/null @@ -1,44 +0,0 @@ -module Aclog - class Logger - def debug(msg) - if @level == :debug - log(@out, "DEBUG", msg) - end - end - - def info(msg) - unless @level == :none - @level == :error - @level == :warn - log(@out, "INFO", msg) - end - end - - def warn(msg) - unless @level == :none - @level == :error - log(@err, "WARN", msg) - end - end - - def error(msg) - unless @level == :none - log(@err, "ERROR", msg) - end - end - - def fatal(msg) - log(@err, "FATAL", msg) - end - - def log(out, type, msg) - out.puts Time.now.utc.iso8601(3) + " " + type + ": " + msg.to_s - end - - def initialize(level = :warn) - @out = STDOUT - @err = STDERR - @level = level - end - end -end diff --git a/collector/settings.rb b/collector/settings.rb index 562bd75..71c8c28 100644 --- a/collector/settings.rb +++ b/collector/settings.rb @@ -1,6 +1,10 @@ require "settingslogic" class Settings < Settingslogic + def self.env + ENV["RAILS_ENV"] || "development" + end + source "settings.yml" - namespace ENV["RAILS_ENV"] + namespace env end diff --git a/collector/start.rb b/collector/start.rb index e8c7242..33c0827 100755 --- a/collector/start.rb +++ b/collector/start.rb @@ -4,6 +4,6 @@ require "./worker" $stdout.sync = true $stderr.sync = true -worker = Worker.new +worker = Aclog::Collector::Worker.new worker.start diff --git a/collector/stream.rb b/collector/stream.rb new file mode 100644 index 0000000..5220471 --- /dev/null +++ b/collector/stream.rb @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +require "em-twitter" +require "yajl" +require "./settings" +require "./helper" + +module Aclog + module Collector + class Stream + include Aclog::Collector::Helper + attr_reader :client + attr_accessor :logger + + def initialize(logger, callback, hash) + @logger = logger + @user_id = hash["user_id"] + @account_id = hash["id"] + @callback = callback + ready_client(hash) + end + + def ready_client(hash) + @client = EM::Twitter::Client.new(gopts(hash)) + @client.on_error {|message| + log(:error, "Unknown Error", message) } + @client.on_enhance_your_calm { + log(:warn, "Enhance your calm") } + @client.on_no_data_received { + log(:warn, "No data received") } + @client.on_reconnect {|timeout, retries| + log(:warn, "Reconnected", retries) } + @client.on_max_reconnects {|timeout, retries| + log(:warn, "Reached max reconnects", retries) } + @client.on_unauthorized { + log(:warn, "Unauthorized") + @callback.call(type: "unauthorized", user_id: @user_id, id: @account_id) } + + @client.each do |chunk| + begin + hash = Yajl::Parser.parse(chunk, :symbolize_keys => true) + rescue Yajl::ParseError + log(:warn, "Unexpected chunk", chunk) + next + end + + if hash[:warning] + log(:info, "Stall warning", hash[:warning]) + elsif hash[:delete] + if d = hash[:delete][:status] + send_delete(d[:id], d[:user_id]) + end + elsif hash[:limit] + log(:warn, "UserStreams Limit", hash[:limit][:track]) + elsif hash[:event] + # event + case hash[:event] + when "favorite" + send_favorite(hash[:source], hash[:target_object]) + when "unfavorite" + send_unfavorite(hash[:source], hash[:target_object]) + end + elsif hash[:user] + # tweet + if hash[:retweeted_status] + if hash[:retweeted_status][:user][:id] == @user_id || + hash[:user][:id] == @user_id + send_retweet(hash) + end + elsif hash[:user][:id] == @user_id + send_tweet(hash) + end + elsif hash[:friends] + # maybe first message + log(:debug, "Received friends", hash[:friends].size) + else + log(:info, "Unexpected UserStreams data", hash) + end + end + @client + end + + def start + @client.connect + log(:info, "Connected") + end + + def update(hash) + opts = gopts(hash) + if opts[:oauth][:token] != @client.options[:oauth][:token] + @client.connection.update(opts) + log(:info, "Connection updated") + end + end + + def stop + @client.connection.stop + end + + private + def log(level, msg, data = nil) + @logger.send(level, "#{msg}(##{@account_id}/#{@user_id}): #{data}") + end + + def gopts(msg) + { + host: "userstream.twitter.com", + path: "/1.1/user.json", + oauth: { + consumer_key: Settings.consumer[msg["consumer_version"]].key, + consumer_secret: Settings.consumer[msg["consumer_version"]].secret, + token: msg["oauth_token"], + token_secret: msg["oauth_token_secret"]}, + method: :get + } + end + + def send_user(user) + @callback.call( + type: "user", + id: user[:id], + screen_name: user[:screen_name], + name: user[:name], + profile_image_url: user[:profile_image_url_https], + protected: user[:protected] + ) + log(:debug, "Sent user", user[:id]) + end + + def send_tweet(status) + send_user(status[:user]) + @callback.call( + type: "tweet", + id: status[:id], + text: format_text(status), + source: format_source(status), + tweeted_at: status[:created_at], + user_id: status[:user][:id] + ) + log(:debug, "Sent tweet", status[:id]) + end + + def send_favorite(source, target_object) + send_tweet(target_object) + send_user(source) + @callback.call( + type: "favorite", + tweet_id: target_object[:id], + user_id: source[:id] + ) + log(:debug, "Sent favorite", source[:id] => target_object[:id]) + end + + def send_unfavorite(source, target_object) + @callback.call( + type: "delete", + tweet_id: target_object[:id], + user_id: source[:id] + ) + log(:debug, "Sent unfavorite", source[:id] => target_object[:id]) + end + + def send_retweet(status) + send_tweet(status[:retweeted_status]) + send_user(status[:user]) + @callback.call( + type: "retweet", + id: status[:id], + tweet_id: status[:retweeted_status][:id], + user_id: status[:user][:id] + ) + log(:debug, "Sent retweet", status[:user][:id] => status[:retweeted_status][:id]) + end + + def send_delete(deleted_status_id, deleted_user_id) + @callback.call( + :type => "delete", + :id => deleted_status_id, + :user_id => deleted_user_id + ) + log(:debug, "Sent delete", deleted_user_id => deleted_status_id) + end + end + end +end diff --git a/collector/worker.rb b/collector/worker.rb index 9d57e84..6c901d9 100644 --- a/collector/worker.rb +++ b/collector/worker.rb @@ -1,362 +1,29 @@ # -*- coding: utf-8 -*- -require "em-twitter" -require "yajl" -require "msgpack" -require "./settings" -require "./logger" +require "logger" +require "./connection" -class Worker - class DBProxyClient < EM::Connection - def send_object(data) - send_data(data.to_msgpack) - end - - def initialize - @clients = {} - @pac = MessagePack::Unpacker.new - @excludes = [] - end - - def escape_colon(str); str.gsub(":", "%3A").gsub("<", "%3C").gsub(">", "%3E"); end - - def format_text(status) - chars = status[:text].to_s.split(//) - - entities = status[:entities].map{|k, v| v.map{|n| n.update(:type => k)}}.flatten.sort_by{|entity| entity[:indices].first} - - result = [] - last_index = entities.inject(0) do |last_index, entity| - result << chars[last_index...entity[:indices].first] - case entity[:type] - when :urls, :media - result << "<url:#{escape_colon(entity[:expanded_url])}:#{escape_colon(entity[:display_url])}>" - when :hashtags - result << "<hashtag:#{escape_colon(entity[:text])}>" - when :user_mentions - result << "<mention:#{escape_colon(entity[:screen_name])}>" - when :symbols - result << "<symbol:#{escape_colon(entity[:text])}>" - end - - entity[:indices].last - end - result << chars[last_index..-1] - - result.flatten.join - end - - def format_source(status) - if status[:source].index("<a") - url = status[:source].scan(/href="(.+?)"/).flatten.first - name = status[:source].scan(/>(.+?)</).flatten.first - "<url:#{escape_colon(url)}:#{escape_colon(name)}>" - else - status[:source] - end - end - - def favbot?(source, target_object) - favs = source[:favourites_count] - tweets = source[:statuses_count] - bio = source[:description] - name = source[:name] - # えたふぉ - if (Time.now - Time.parse(target_object[:created_at])) < 2 - case - when - (tweets < 1000 && favs > tweets * 2), - (source[:friends_count] > source[:followers_count] * 10 && favs > tweets) - return true - end - end - if - case - when - favs > tweets * 30, - (tweets < 100 && favs > tweets * 2), - (source[:default_profile_image] == true && favs > tweets * 2) - return true - end - end - - false - end - - def bot?(status) - sources = [ - /^<a href="http:\/\/twittbot\.net\/" rel="nofollow">twittbot\.net<\/a>$/, - /^<a href="http:\/\/botmaker\.dplays\.net\/" rel="nofollow">BotMaker<\/a>$/, - /^<a href="http:\/\/makebot\.sh\/" rel="nofollow">makebot\.sh( [0-9])?<\/a>$/, - /^<a href="http:\/\/lbattery\.dip\.jp\/twihaialert\/" rel="nofollow">ツイ廃あらーと<\/a>$/, - /^<a href="http:\/\/app\.xmgn\.com\/tweetcounter\/" rel="nofollow">ツイート数カウントくん<\/a>$/, - /^<a href="http:\/\/gumu-lab\.com\/replychecker\/" rel="nofollow">リプライ数チェッカ<\/a>$/, - /^<a href="http:\/\/bit.ly\/SiHFe6" rel="nofollow">劣化コピー<\/a>$/, - ] - sources.any?{|r| r =~ status[:source]} - end - - def send_spam(user) - send_object({ - type: "spam", - id: user[:id] - }) - end - - def send_user(user) - send_object({ - :type => "user", - :id => user[:id], - :screen_name => user[:screen_name], - :name => user[:name], - :profile_image_url => user[:profile_image_url_https], - :protected => user[:protected] - }) - end - - def send_tweet(status) - unless bot?(status) - send_user(status[:user]) - send_object({ - :type => "tweet", - :id => status[:id], - :text => format_text(status), - :source => format_source(status), - :tweeted_at => status[:created_at], - :user_id => status[:user][:id] - }) - end - end - - def send_favorite(source, target_object) - if @excludes.include?(source[:id]) - # ignored - else - if favbot?(source, target_object) - # ignored - send_spam(source) - $logger.info("Add #{source[:screen_name]}(##{source[:id]}) to ignore list") - @excludes << source[:id] - elsif !bot?(target_object) - send_tweet(target_object) - send_user(source) - send_object({ - :type => "favorite", - :tweet_id => target_object[:id], - :user_id => source[:id] - }) - end - end - end - - def send_unfavorite(source, target_object) - send_object({ - :type => "delete", - :tweet_id => target_object[:id], - :user_id => source[:id] - }) - end - - def send_retweet(status) - unless bot?(status[:retweeted_status]) - send_tweet(status[:retweeted_status]) - send_user(status[:user]) - send_object({ - :type => "retweet", - :id => status[:id], - :tweet_id => status[:retweeted_status][:id], - :user_id => status[:user][:id] - }) - end - end - - def send_delete(deleted_status_id, deleted_user_id) - send_object({ - :type => "delete", - :id => deleted_status_id, - :user_id => deleted_user_id - }) - end - ### end - send to receiver - - def receive_account(msg) - user_id = msg["user_id"] - account_id = msg["id"] - - conopts = {:host => "userstream.twitter.com", - :path => "/1.1/user.json", - :oauth => { - :consumer_key => Settings.consumer[msg["consumer_version"].to_i].key, - :consumer_secret => Settings.consumer[msg["consumer_version"].to_i].secret, - :token => msg["oauth_token"], - :token_secret => msg["oauth_token_secret"]}, - :method => "GET"} - if @clients[account_id] - unless @clients[account_id].options[:oauth][:token] == conopts[:oauth][:token] - @clients[account_id].connection.update(conopts) - $logger.info("Updated(##{account_id}/#{user_id}/#{msg["consumer_version"].to_i})") - else - $logger.info("Not Updated(##{account_id}/#{user_id}/#{msg["consumer_version"].to_i})") - end - return - end - @clients[account_id] = client = EM::Twitter::Client.new(conopts) - - client.on_error do |message| - $logger.warn("Unknown Error(##{account_id}/#{user_id}): #{message}") +module Aclog + module Collector + class Worker + def initialize + @logger = Logger.new(STDOUT) + @logger.level = Settings.env == "development" ? Logger::DEBUG : Logger::INFO end - client.on_unauthorized do - # revoked? - $logger.warn("Unauthorized(##{account_id}/#{user_id})") - out = {:type => "unauthorized", :user_id => user_id, :id => account_id} - send_object(out) - client.connection.stop - @clients.delete(account_id) - end - - client.on_enhance_your_calm do - # limit? - $logger.warn("Enhance your calm(##{account_id}/#{user_id})") - end - - client.on_no_data_received do - # (?) - $logger.warn("No data received(##{account_id}/#{user_id})") - client.close_connection - end + def start + EM.run do + connection = EM.connect(Settings.receiver_host, Settings.receiver_port, Aclog::Collector::Connection, @logger) - client.each do |chunk| - begin - hash = Yajl::Parser.parse(chunk, :symbolize_keys => true) - rescue Yajl::ParseError - $logger.warn("Unexpected chunk(##{account_id}/#{user_id}): #{chunk}") - next - end - - if hash[:warning] - $logger.info("Stall warning(##{account_id}/#{user_id}): #{hash[:warning]}") - elsif hash[:delete] && hash[:delete][:status] - deleted_status_id = hash[:delete][:status][:id] - deleted_user_id = hash[:delete][:status][:user_id] - send_delete(deleted_status_id, deleted_user_id) - $logger.debug("Delete(##{account_id}/#{user_id}): #{deleted_user_id} => #{deleted_status_id}") - elsif hash[:limit] - $logger.warn("UserStreams Limit Event(##{account_id}/#{user_id}): #{hash[:limit][:track]}") - elsif hash[:event] - case hash[:event] - when "favorite" - source = hash[:source] - target_object = hash[:target_object] - if target_object && target_object[:user] && - (!target_object[:user][:protected] || - target_object[:user][:id] == user_id) - send_favorite(source, target_object) - $logger.debug("Favorite(##{account_id}/#{user_id}): #{source[:screen_name]} => #{target_object[:id]}") - end - when "unfavorite" - send_unfavorite(hash[:source], hash[:target_object]) - $logger.debug("Unfavorite(##{account_id}/#{user_id}): #{hash[:source][:screen_name]} => #{hash[:target_object][:id]}") - end - elsif hash[:user] - # tweet - if hash[:retweeted_status] - if hash[:retweeted_status][:user][:id] == user_id || hash[:user][:id] == user_id - send_retweet(hash) - $logger.debug("Retweet(##{account_id}/#{user_id}): #{hash[:user][:screen_name]} => #{hash[:retweeted_status][:id]}") - end - elsif hash[:user][:id] == user_id - # update: exclude not favorited tweet - send_tweet(hash) - $logger.debug("Tweet(##{account_id}/#{user_id}): #{hash[:user][:screen_name]} => #{hash[:id]}") + stop = proc do + @logger.info("Quitting collector...") + connection.quit + EM.stop end - elsif hash[:friends] - # monyo - else - $logger.debug("??(##{account_id}/#{user_id})") - end - end - client.on_reconnect do |timeout, retries| - $logger.warn("Reconnected(##{account_id}/#{user_id}): #{retries}") - end - - client.on_max_reconnects do |timeout, retries| - $logger.warn("Max reconnects(##{account_id}/#{user_id}): #{retries}") - client.connection.stop - @clients.delete(account_id) - end - - client.connect - $logger.info("Connected(##{account_id}/#{user_id}/#{msg["consumer_version"].to_i})") - end - - def post_init - out = {:type => "init", - :secret_key => Settings.secret_key, - :worker_number => Settings.worker_number} - send_object(out) - end - - def unbind - $logger.info("Connection closed") - EM.add_timer(10) do - reconnect(Settings.receiver_host, Settings.receiver_port) - post_init - end - end - - def receive_data(data) - @pac.feed_each(data) do |msg| - unless msg.is_a?(Hash) && msg["type"] - $logger.warn("Unknown data: #{msg}") - return + Signal.trap(:INT, &stop) + Signal.trap(:TERM, &stop) end - - case msg["type"] - when "ok" - $logger.info("ok: #{msg["message"]}") - when "error" - $logger.info("error: #{msg["message"]}") - when "fatal" - $logger.info("fatal: #{msg["message"]}") - when "bye" - $logger.info("bye: #{msg["message"]}") - when "account" - begin - receive_account(msg) - rescue - $logger.error($!) - $logger.error($@) - end - else - $logger.info("Unknown message type: #{msg}") - end - end - end - - def stop_all - @clients.map{|k, v| v.connection.stop} - send_object({:type => "quit", :reason => "stop_all"}) - end - end - - def initialize - $logger = Aclog::Logger.new(:info) - end - - def start - $logger.info("Worker ##{Settings.worker_number} started") - EM.run do - connection = EM.connect(Settings.receiver_host, Settings.receiver_port, DBProxyClient) - - stop = Proc.new do - connection.stop_all - EM.stop end - Signal.trap(:INT, &stop) - Signal.trap(:TERM, &stop) end end end - - |