aboutsummaryrefslogtreecommitdiffstats
path: root/collector
diff options
context:
space:
mode:
authorrhenium <re4k@re4k.info>2013-05-11 08:56:02 +0900
committerrhenium <re4k@re4k.info>2013-05-11 08:56:02 +0900
commit73fc0f1c0fbcc71743df2b8fba3c155986cf4cf9 (patch)
tree4e0694e4b15efd510e1461f578f15521ff3d8d14 /collector
parentbd35c7dad0c8481f508e21815faa1f2d41287465 (diff)
downloadaclog-73fc0f1c0fbcc71743df2b8fba3c155986cf4cf9.tar.gz
update collector
Diffstat (limited to 'collector')
-rw-r--r--collector/connection.rb81
-rw-r--r--collector/helper.rb45
-rw-r--r--collector/logger.rb44
-rw-r--r--collector/settings.rb6
-rwxr-xr-xcollector/start.rb2
-rw-r--r--collector/stream.rb184
-rw-r--r--collector/worker.rb367
7 files changed, 333 insertions, 396 deletions
diff --git a/collector/connection.rb b/collector/connection.rb
new file mode 100644
index 0000000..8a06b3f
--- /dev/null
+++ b/collector/connection.rb
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+require "msgpack"
+require "./settings"
+require "./stream"
+
+module Aclog
+ module Collector
+ class Connection < EM::Connection
+ def initialize(logger)
+ @logger = logger
+ @clients = {}
+ @pac = MessagePack::Unpacker.new
+ @connected = false
+ end
+
+ def post_init
+ send_object(
+ type: "init",
+ secret_key: Settings.secret_key,
+ worker_number: Settings.worker_number
+ )
+ end
+
+ def unbind
+ @logger.info("Connection closed") if @connected
+ EM.add_timer(10) do
+ @connected = false
+ reconnect(Settings.receiver_host, Settings.receiver_port)
+ post_init
+ end
+ end
+
+ # Server
+ def receive_data(data)
+ @pac.feed_each(data) do |msg|
+ if not msg.is_a?(Hash) or not msg["type"]
+ @logger.warn("Unknown data: #{msg}")
+ close_connection_after_writing
+ return
+ end
+
+ case msg["type"]
+ when "ok"
+ @connected = true
+ @logger.info("Connected with server")
+ when "error"
+ @logger.info("error: #{msg["message"]}")
+ when "fatal"
+ @logger.info("fatal: #{msg["message"]}")
+ close_connection
+ when "bye"
+ close_connection
+ when "account"
+ account_id = msg["id"]
+ if not @clients[account_id]
+ user_connection = Aclog::Collector::Stream.new(@logger, method(:send_object), msg)
+ user_connection.start
+ @clients[account_id] = user_connection
+ else
+ @clients[account_id].update(msg)
+ end
+ else
+ @logger.info("Unknown message: #{msg}")
+ end
+ end
+ end
+
+ def quit
+ send_object(type: "quit", reason: "stop")
+ @clients.values.map(&:stop)
+ end
+
+ private
+ def send_object(data)
+ send_data(data.to_msgpack)
+ end
+ end
+ end
+end
+
+
diff --git a/collector/helper.rb b/collector/helper.rb
new file mode 100644
index 0000000..30bdb9f
--- /dev/null
+++ b/collector/helper.rb
@@ -0,0 +1,45 @@
+# -*- encoding: utf-8 -*-
+
+module Aclog
+ module Collector
+ module Helper
+ def format_text(status)
+ text = status[:text]
+ entities = status[:entities].map{|k, v| v.map{|n| n.update(type: k)}}.flatten.sort_by{|entity| entity[:indices].first}
+
+ result = ""
+ last_index = entities.inject(0) do |last_index, entity|
+ result << text[last_index...entity[:indices].first]
+ case entity[:type]
+ when :urls, :media
+ result << "<url:#{escape_colon(entity[:expanded_url])}:#{escape_colon(entity[:display_url])}>"
+ when :hashtags
+ result << "<hashtag:#{escape_colon(entity[:text])}>"
+ when :user_mentions
+ result << "<mention:#{escape_colon(entity[:screen_name])}>"
+ when :symbols
+ result << "<symbol:#{escape_colon(entity[:text])}>"
+ end
+
+ entity[:indices].last
+ end
+ result << text[last_index..-1]
+
+ result
+ end
+
+ def format_source(status)
+ if status[:source].include?("<a")
+ url, name = status[:source].scan(/<a href="(.+?)" rel="nofollow">(.+?)<\/a>/).flatten
+ "<url:#{escape_colon(url)}:#{escape_colon(name)}>"
+ else
+ # web, txt, ..
+ status[:source]
+ end
+ end
+
+ private
+ def escape_colon(str); str.gsub(":", "%3A").gsub("<", "%3C").gsub(">", "%3E") end
+ end
+ end
+end
diff --git a/collector/logger.rb b/collector/logger.rb
deleted file mode 100644
index eb09894..0000000
--- a/collector/logger.rb
+++ /dev/null
@@ -1,44 +0,0 @@
-module Aclog
- class Logger
- def debug(msg)
- if @level == :debug
- log(@out, "DEBUG", msg)
- end
- end
-
- def info(msg)
- unless @level == :none
- @level == :error
- @level == :warn
- log(@out, "INFO", msg)
- end
- end
-
- def warn(msg)
- unless @level == :none
- @level == :error
- log(@err, "WARN", msg)
- end
- end
-
- def error(msg)
- unless @level == :none
- log(@err, "ERROR", msg)
- end
- end
-
- def fatal(msg)
- log(@err, "FATAL", msg)
- end
-
- def log(out, type, msg)
- out.puts Time.now.utc.iso8601(3) + " " + type + ": " + msg.to_s
- end
-
- def initialize(level = :warn)
- @out = STDOUT
- @err = STDERR
- @level = level
- end
- end
-end
diff --git a/collector/settings.rb b/collector/settings.rb
index 562bd75..71c8c28 100644
--- a/collector/settings.rb
+++ b/collector/settings.rb
@@ -1,6 +1,10 @@
require "settingslogic"
class Settings < Settingslogic
+ def self.env
+ ENV["RAILS_ENV"] || "development"
+ end
+
source "settings.yml"
- namespace ENV["RAILS_ENV"]
+ namespace env
end
diff --git a/collector/start.rb b/collector/start.rb
index e8c7242..33c0827 100755
--- a/collector/start.rb
+++ b/collector/start.rb
@@ -4,6 +4,6 @@ require "./worker"
$stdout.sync = true
$stderr.sync = true
-worker = Worker.new
+worker = Aclog::Collector::Worker.new
worker.start
diff --git a/collector/stream.rb b/collector/stream.rb
new file mode 100644
index 0000000..5220471
--- /dev/null
+++ b/collector/stream.rb
@@ -0,0 +1,184 @@
+# -*- coding: utf-8 -*-
+require "em-twitter"
+require "yajl"
+require "./settings"
+require "./helper"
+
+module Aclog
+ module Collector
+ class Stream
+ include Aclog::Collector::Helper
+ attr_reader :client
+ attr_accessor :logger
+
+ def initialize(logger, callback, hash)
+ @logger = logger
+ @user_id = hash["user_id"]
+ @account_id = hash["id"]
+ @callback = callback
+ ready_client(hash)
+ end
+
+ def ready_client(hash)
+ @client = EM::Twitter::Client.new(gopts(hash))
+ @client.on_error {|message|
+ log(:error, "Unknown Error", message) }
+ @client.on_enhance_your_calm {
+ log(:warn, "Enhance your calm") }
+ @client.on_no_data_received {
+ log(:warn, "No data received") }
+ @client.on_reconnect {|timeout, retries|
+ log(:warn, "Reconnected", retries) }
+ @client.on_max_reconnects {|timeout, retries|
+ log(:warn, "Reached max reconnects", retries) }
+ @client.on_unauthorized {
+ log(:warn, "Unauthorized")
+ @callback.call(type: "unauthorized", user_id: @user_id, id: @account_id) }
+
+ @client.each do |chunk|
+ begin
+ hash = Yajl::Parser.parse(chunk, :symbolize_keys => true)
+ rescue Yajl::ParseError
+ log(:warn, "Unexpected chunk", chunk)
+ next
+ end
+
+ if hash[:warning]
+ log(:info, "Stall warning", hash[:warning])
+ elsif hash[:delete]
+ if d = hash[:delete][:status]
+ send_delete(d[:id], d[:user_id])
+ end
+ elsif hash[:limit]
+ log(:warn, "UserStreams Limit", hash[:limit][:track])
+ elsif hash[:event]
+ # event
+ case hash[:event]
+ when "favorite"
+ send_favorite(hash[:source], hash[:target_object])
+ when "unfavorite"
+ send_unfavorite(hash[:source], hash[:target_object])
+ end
+ elsif hash[:user]
+ # tweet
+ if hash[:retweeted_status]
+ if hash[:retweeted_status][:user][:id] == @user_id ||
+ hash[:user][:id] == @user_id
+ send_retweet(hash)
+ end
+ elsif hash[:user][:id] == @user_id
+ send_tweet(hash)
+ end
+ elsif hash[:friends]
+ # maybe first message
+ log(:debug, "Received friends", hash[:friends].size)
+ else
+ log(:info, "Unexpected UserStreams data", hash)
+ end
+ end
+ @client
+ end
+
+ def start
+ @client.connect
+ log(:info, "Connected")
+ end
+
+ def update(hash)
+ opts = gopts(hash)
+ if opts[:oauth][:token] != @client.options[:oauth][:token]
+ @client.connection.update(opts)
+ log(:info, "Connection updated")
+ end
+ end
+
+ def stop
+ @client.connection.stop
+ end
+
+ private
+ def log(level, msg, data = nil)
+ @logger.send(level, "#{msg}(##{@account_id}/#{@user_id}): #{data}")
+ end
+
+ def gopts(msg)
+ {
+ host: "userstream.twitter.com",
+ path: "/1.1/user.json",
+ oauth: {
+ consumer_key: Settings.consumer[msg["consumer_version"]].key,
+ consumer_secret: Settings.consumer[msg["consumer_version"]].secret,
+ token: msg["oauth_token"],
+ token_secret: msg["oauth_token_secret"]},
+ method: :get
+ }
+ end
+
+ def send_user(user)
+ @callback.call(
+ type: "user",
+ id: user[:id],
+ screen_name: user[:screen_name],
+ name: user[:name],
+ profile_image_url: user[:profile_image_url_https],
+ protected: user[:protected]
+ )
+ log(:debug, "Sent user", user[:id])
+ end
+
+ def send_tweet(status)
+ send_user(status[:user])
+ @callback.call(
+ type: "tweet",
+ id: status[:id],
+ text: format_text(status),
+ source: format_source(status),
+ tweeted_at: status[:created_at],
+ user_id: status[:user][:id]
+ )
+ log(:debug, "Sent tweet", status[:id])
+ end
+
+ def send_favorite(source, target_object)
+ send_tweet(target_object)
+ send_user(source)
+ @callback.call(
+ type: "favorite",
+ tweet_id: target_object[:id],
+ user_id: source[:id]
+ )
+ log(:debug, "Sent favorite", source[:id] => target_object[:id])
+ end
+
+ def send_unfavorite(source, target_object)
+ @callback.call(
+ type: "delete",
+ tweet_id: target_object[:id],
+ user_id: source[:id]
+ )
+ log(:debug, "Sent unfavorite", source[:id] => target_object[:id])
+ end
+
+ def send_retweet(status)
+ send_tweet(status[:retweeted_status])
+ send_user(status[:user])
+ @callback.call(
+ type: "retweet",
+ id: status[:id],
+ tweet_id: status[:retweeted_status][:id],
+ user_id: status[:user][:id]
+ )
+ log(:debug, "Sent retweet", status[:user][:id] => status[:retweeted_status][:id])
+ end
+
+ def send_delete(deleted_status_id, deleted_user_id)
+ @callback.call(
+ :type => "delete",
+ :id => deleted_status_id,
+ :user_id => deleted_user_id
+ )
+ log(:debug, "Sent delete", deleted_user_id => deleted_status_id)
+ end
+ end
+ end
+end
diff --git a/collector/worker.rb b/collector/worker.rb
index 9d57e84..6c901d9 100644
--- a/collector/worker.rb
+++ b/collector/worker.rb
@@ -1,362 +1,29 @@
# -*- coding: utf-8 -*-
-require "em-twitter"
-require "yajl"
-require "msgpack"
-require "./settings"
-require "./logger"
+require "logger"
+require "./connection"
-class Worker
- class DBProxyClient < EM::Connection
- def send_object(data)
- send_data(data.to_msgpack)
- end
-
- def initialize
- @clients = {}
- @pac = MessagePack::Unpacker.new
- @excludes = []
- end
-
- def escape_colon(str); str.gsub(":", "%3A").gsub("<", "%3C").gsub(">", "%3E"); end
-
- def format_text(status)
- chars = status[:text].to_s.split(//)
-
- entities = status[:entities].map{|k, v| v.map{|n| n.update(:type => k)}}.flatten.sort_by{|entity| entity[:indices].first}
-
- result = []
- last_index = entities.inject(0) do |last_index, entity|
- result << chars[last_index...entity[:indices].first]
- case entity[:type]
- when :urls, :media
- result << "<url:#{escape_colon(entity[:expanded_url])}:#{escape_colon(entity[:display_url])}>"
- when :hashtags
- result << "<hashtag:#{escape_colon(entity[:text])}>"
- when :user_mentions
- result << "<mention:#{escape_colon(entity[:screen_name])}>"
- when :symbols
- result << "<symbol:#{escape_colon(entity[:text])}>"
- end
-
- entity[:indices].last
- end
- result << chars[last_index..-1]
-
- result.flatten.join
- end
-
- def format_source(status)
- if status[:source].index("<a")
- url = status[:source].scan(/href="(.+?)"/).flatten.first
- name = status[:source].scan(/>(.+?)</).flatten.first
- "<url:#{escape_colon(url)}:#{escape_colon(name)}>"
- else
- status[:source]
- end
- end
-
- def favbot?(source, target_object)
- favs = source[:favourites_count]
- tweets = source[:statuses_count]
- bio = source[:description]
- name = source[:name]
- # えたふぉ
- if (Time.now - Time.parse(target_object[:created_at])) < 2
- case
- when
- (tweets < 1000 && favs > tweets * 2),
- (source[:friends_count] > source[:followers_count] * 10 && favs > tweets)
- return true
- end
- end
- if
- case
- when
- favs > tweets * 30,
- (tweets < 100 && favs > tweets * 2),
- (source[:default_profile_image] == true && favs > tweets * 2)
- return true
- end
- end
-
- false
- end
-
- def bot?(status)
- sources = [
- /^<a href="http:\/\/twittbot\.net\/" rel="nofollow">twittbot\.net<\/a>$/,
- /^<a href="http:\/\/botmaker\.dplays\.net\/" rel="nofollow">BotMaker<\/a>$/,
- /^<a href="http:\/\/makebot\.sh\/" rel="nofollow">makebot\.sh( [0-9])?<\/a>$/,
- /^<a href="http:\/\/lbattery\.dip\.jp\/twihaialert\/" rel="nofollow">ツイ廃あらーと<\/a>$/,
- /^<a href="http:\/\/app\.xmgn\.com\/tweetcounter\/" rel="nofollow">ツイート数カウントくん<\/a>$/,
- /^<a href="http:\/\/gumu-lab\.com\/replychecker\/" rel="nofollow">リプライ数チェッカ<\/a>$/,
- /^<a href="http:\/\/bit.ly\/SiHFe6" rel="nofollow">劣化コピー<\/a>$/,
- ]
- sources.any?{|r| r =~ status[:source]}
- end
-
- def send_spam(user)
- send_object({
- type: "spam",
- id: user[:id]
- })
- end
-
- def send_user(user)
- send_object({
- :type => "user",
- :id => user[:id],
- :screen_name => user[:screen_name],
- :name => user[:name],
- :profile_image_url => user[:profile_image_url_https],
- :protected => user[:protected]
- })
- end
-
- def send_tweet(status)
- unless bot?(status)
- send_user(status[:user])
- send_object({
- :type => "tweet",
- :id => status[:id],
- :text => format_text(status),
- :source => format_source(status),
- :tweeted_at => status[:created_at],
- :user_id => status[:user][:id]
- })
- end
- end
-
- def send_favorite(source, target_object)
- if @excludes.include?(source[:id])
- # ignored
- else
- if favbot?(source, target_object)
- # ignored
- send_spam(source)
- $logger.info("Add #{source[:screen_name]}(##{source[:id]}) to ignore list")
- @excludes << source[:id]
- elsif !bot?(target_object)
- send_tweet(target_object)
- send_user(source)
- send_object({
- :type => "favorite",
- :tweet_id => target_object[:id],
- :user_id => source[:id]
- })
- end
- end
- end
-
- def send_unfavorite(source, target_object)
- send_object({
- :type => "delete",
- :tweet_id => target_object[:id],
- :user_id => source[:id]
- })
- end
-
- def send_retweet(status)
- unless bot?(status[:retweeted_status])
- send_tweet(status[:retweeted_status])
- send_user(status[:user])
- send_object({
- :type => "retweet",
- :id => status[:id],
- :tweet_id => status[:retweeted_status][:id],
- :user_id => status[:user][:id]
- })
- end
- end
-
- def send_delete(deleted_status_id, deleted_user_id)
- send_object({
- :type => "delete",
- :id => deleted_status_id,
- :user_id => deleted_user_id
- })
- end
- ### end - send to receiver
-
- def receive_account(msg)
- user_id = msg["user_id"]
- account_id = msg["id"]
-
- conopts = {:host => "userstream.twitter.com",
- :path => "/1.1/user.json",
- :oauth => {
- :consumer_key => Settings.consumer[msg["consumer_version"].to_i].key,
- :consumer_secret => Settings.consumer[msg["consumer_version"].to_i].secret,
- :token => msg["oauth_token"],
- :token_secret => msg["oauth_token_secret"]},
- :method => "GET"}
- if @clients[account_id]
- unless @clients[account_id].options[:oauth][:token] == conopts[:oauth][:token]
- @clients[account_id].connection.update(conopts)
- $logger.info("Updated(##{account_id}/#{user_id}/#{msg["consumer_version"].to_i})")
- else
- $logger.info("Not Updated(##{account_id}/#{user_id}/#{msg["consumer_version"].to_i})")
- end
- return
- end
- @clients[account_id] = client = EM::Twitter::Client.new(conopts)
-
- client.on_error do |message|
- $logger.warn("Unknown Error(##{account_id}/#{user_id}): #{message}")
+module Aclog
+ module Collector
+ class Worker
+ def initialize
+ @logger = Logger.new(STDOUT)
+ @logger.level = Settings.env == "development" ? Logger::DEBUG : Logger::INFO
end
- client.on_unauthorized do
- # revoked?
- $logger.warn("Unauthorized(##{account_id}/#{user_id})")
- out = {:type => "unauthorized", :user_id => user_id, :id => account_id}
- send_object(out)
- client.connection.stop
- @clients.delete(account_id)
- end
-
- client.on_enhance_your_calm do
- # limit?
- $logger.warn("Enhance your calm(##{account_id}/#{user_id})")
- end
-
- client.on_no_data_received do
- # (?)
- $logger.warn("No data received(##{account_id}/#{user_id})")
- client.close_connection
- end
+ def start
+ EM.run do
+ connection = EM.connect(Settings.receiver_host, Settings.receiver_port, Aclog::Collector::Connection, @logger)
- client.each do |chunk|
- begin
- hash = Yajl::Parser.parse(chunk, :symbolize_keys => true)
- rescue Yajl::ParseError
- $logger.warn("Unexpected chunk(##{account_id}/#{user_id}): #{chunk}")
- next
- end
-
- if hash[:warning]
- $logger.info("Stall warning(##{account_id}/#{user_id}): #{hash[:warning]}")
- elsif hash[:delete] && hash[:delete][:status]
- deleted_status_id = hash[:delete][:status][:id]
- deleted_user_id = hash[:delete][:status][:user_id]
- send_delete(deleted_status_id, deleted_user_id)
- $logger.debug("Delete(##{account_id}/#{user_id}): #{deleted_user_id} => #{deleted_status_id}")
- elsif hash[:limit]
- $logger.warn("UserStreams Limit Event(##{account_id}/#{user_id}): #{hash[:limit][:track]}")
- elsif hash[:event]
- case hash[:event]
- when "favorite"
- source = hash[:source]
- target_object = hash[:target_object]
- if target_object && target_object[:user] &&
- (!target_object[:user][:protected] ||
- target_object[:user][:id] == user_id)
- send_favorite(source, target_object)
- $logger.debug("Favorite(##{account_id}/#{user_id}): #{source[:screen_name]} => #{target_object[:id]}")
- end
- when "unfavorite"
- send_unfavorite(hash[:source], hash[:target_object])
- $logger.debug("Unfavorite(##{account_id}/#{user_id}): #{hash[:source][:screen_name]} => #{hash[:target_object][:id]}")
- end
- elsif hash[:user]
- # tweet
- if hash[:retweeted_status]
- if hash[:retweeted_status][:user][:id] == user_id || hash[:user][:id] == user_id
- send_retweet(hash)
- $logger.debug("Retweet(##{account_id}/#{user_id}): #{hash[:user][:screen_name]} => #{hash[:retweeted_status][:id]}")
- end
- elsif hash[:user][:id] == user_id
- # update: exclude not favorited tweet
- send_tweet(hash)
- $logger.debug("Tweet(##{account_id}/#{user_id}): #{hash[:user][:screen_name]} => #{hash[:id]}")
+ stop = proc do
+ @logger.info("Quitting collector...")
+ connection.quit
+ EM.stop
end
- elsif hash[:friends]
- # monyo
- else
- $logger.debug("??(##{account_id}/#{user_id})")
- end
- end
- client.on_reconnect do |timeout, retries|
- $logger.warn("Reconnected(##{account_id}/#{user_id}): #{retries}")
- end
-
- client.on_max_reconnects do |timeout, retries|
- $logger.warn("Max reconnects(##{account_id}/#{user_id}): #{retries}")
- client.connection.stop
- @clients.delete(account_id)
- end
-
- client.connect
- $logger.info("Connected(##{account_id}/#{user_id}/#{msg["consumer_version"].to_i})")
- end
-
- def post_init
- out = {:type => "init",
- :secret_key => Settings.secret_key,
- :worker_number => Settings.worker_number}
- send_object(out)
- end
-
- def unbind
- $logger.info("Connection closed")
- EM.add_timer(10) do
- reconnect(Settings.receiver_host, Settings.receiver_port)
- post_init
- end
- end
-
- def receive_data(data)
- @pac.feed_each(data) do |msg|
- unless msg.is_a?(Hash) && msg["type"]
- $logger.warn("Unknown data: #{msg}")
- return
+ Signal.trap(:INT, &stop)
+ Signal.trap(:TERM, &stop)
end
-
- case msg["type"]
- when "ok"
- $logger.info("ok: #{msg["message"]}")
- when "error"
- $logger.info("error: #{msg["message"]}")
- when "fatal"
- $logger.info("fatal: #{msg["message"]}")
- when "bye"
- $logger.info("bye: #{msg["message"]}")
- when "account"
- begin
- receive_account(msg)
- rescue
- $logger.error($!)
- $logger.error($@)
- end
- else
- $logger.info("Unknown message type: #{msg}")
- end
- end
- end
-
- def stop_all
- @clients.map{|k, v| v.connection.stop}
- send_object({:type => "quit", :reason => "stop_all"})
- end
- end
-
- def initialize
- $logger = Aclog::Logger.new(:info)
- end
-
- def start
- $logger.info("Worker ##{Settings.worker_number} started")
- EM.run do
- connection = EM.connect(Settings.receiver_host, Settings.receiver_port, DBProxyClient)
-
- stop = Proc.new do
- connection.stop_all
- EM.stop
end
- Signal.trap(:INT, &stop)
- Signal.trap(:TERM, &stop)
end
end
end
-
-