aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRhenium <rhenium@rhe.jp>2013-12-20 21:42:05 +0900
committerRhenium <rhenium@rhe.jp>2013-12-20 21:42:05 +0900
commit4b56cd6b67d890c5e2ed763656a7fd7832b10d8f (patch)
treeeb394e9c187b8bbc60b54d16b72200edbf9380ab
parent376c50784c2c740c6caacd1e94550e3d9d77fe41 (diff)
downloadaclog-4b56cd6b67d890c5e2ed763656a7fd7832b10d8f.tar.gz
update workers
-rw-r--r--app/models/favorite.rb12
-rw-r--r--app/models/retweet.rb8
-rw-r--r--app/models/tweet.rb39
-rw-r--r--app/models/user.rb2
-rw-r--r--collector/Gemfile.lock10
-rw-r--r--collector/connection.rb130
-rw-r--r--collector/helper.rb46
-rw-r--r--collector/settings.rb17
-rw-r--r--collector/settings.yml.example6
-rwxr-xr-xcollector/start.rb6
-rw-r--r--collector/stream.rb182
-rw-r--r--collector/user_stream.rb165
-rw-r--r--collector/worker.rb11
-rw-r--r--lib/aclog/receiver/collector_connection.rb149
-rw-r--r--lib/aclog/receiver/register_server.rb1
-rw-r--r--lib/aclog/receiver/worker.rb6
16 files changed, 368 insertions, 422 deletions
diff --git a/app/models/favorite.rb b/app/models/favorite.rb
index 44a8ae4..08ee580 100644
--- a/app/models/favorite.rb
+++ b/app/models/favorite.rb
@@ -12,14 +12,14 @@ class Favorite < ActiveRecord::Base
def self.from_receiver(msg)
transaction do
- t = Tweet.from_receiver(msg["tweet"])
- u = User.from_receiver(msg["user"])
- f = logger.quietly { t.favorites.create!(user: u) }
- logger.debug("Created Favorite: #{msg["user"]["id"]} => #{msg["tweet"]["id"]}")
+ t = Tweet.from_receiver(msg["target_object"])
+ u = User.from_receiver(msg["source"])
+ f = t.favorites.create!(user: u)
+ logger.debug("Created Favorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}")
return f
end
rescue ActiveRecord::RecordNotUnique
- logger.debug("Duplicate Favorite: #{msg["user"]["id"]} => #{msg["tweet"]["id"]}")
+ logger.debug("Duplicate Favorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}")
return nil
rescue => e
logger.error("Unknown error while inserting favorite: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}")
@@ -27,6 +27,6 @@ class Favorite < ActiveRecord::Base
end
def self.delete_from_receiver(msg)
- where(tweet_id: msg["tweet"]["id"], user_id: msg["user"]["id"]).destroy_all
+ where(tweet_id: msg["target_object"]["id"], user_id: msg["source"]["id"]).destroy_all
end
end
diff --git a/app/models/retweet.rb b/app/models/retweet.rb
index db937cf..807ca1c 100644
--- a/app/models/retweet.rb
+++ b/app/models/retweet.rb
@@ -12,14 +12,14 @@ class Retweet < ActiveRecord::Base
def self.from_receiver(msg)
transaction do
- t = Tweet.from_receiver(msg["tweet"])
+ t = Tweet.from_receiver(msg["retweeted_status"])
u = User.from_receiver(msg["user"])
- r = logger.quietly { t.retweets.create!(id: msg["id"], user: u) }
- logger.debug("Created Retweet: #{msg["id"]}: #{msg["user"]["id"]} => #{msg["tweet"]["id"]}")
+ r = t.retweets.create!(id: msg["id"], user: u)
+ logger.debug("Created Retweet: #{msg["id"]}: #{msg["user"]["id"]} => #{msg["retweeted_status"]["id"]}")
return r
end
rescue ActiveRecord::RecordNotUnique
- logger.debug("Duplicate Retweet: #{msg["id"]}: #{msg["user"]["id"]} => #{msg["tweet"]["id"]}")
+ logger.debug("Duplicate Retweet: #{msg["id"]}: #{msg["user"]["id"]} => #{msg["retweeted_status"]["id"]}")
return nil
rescue => e
logger.error("Unknown error while inserting retweet: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}")
diff --git a/app/models/tweet.rb b/app/models/tweet.rb
index 78adb2e..2642d3b 100644
--- a/app/models/tweet.rb
+++ b/app/models/tweet.rb
@@ -86,14 +86,12 @@ class Tweet < ActiveRecord::Base
unless t
begin
u = User.from_receiver(msg["user"])
- t = logger.quietly do
- self.create!(id: msg["id"],
- text: msg["text"],
- source: msg["source"],
- tweeted_at: Time.parse(msg["tweeted_at"]),
- in_reply_to_id: msg["in_reply_to_status_id"],
- user: u)
- end
+ t = self.create!(id: msg["id"],
+ text: extract_entities(msg["text"], msg["entities"]),
+ source: msg["source"],
+ tweeted_at: Time.parse(msg["created_at"]),
+ in_reply_to_id: msg["in_reply_to_status_id"],
+ user: u)
logger.debug("Created Tweet: #{msg["id"]}")
rescue ActiveRecord::RecordNotUnique
logger.debug("Duplicate Tweet: #{msg["id"]}")
@@ -129,6 +127,31 @@ class Tweet < ActiveRecord::Base
end
private
+ def self.extract_entities(text, entities)
+ escape_colon = -> str { str.gsub(":", "\\:") }
+ entities = entities.map { |k, v| v.map { |n| n.update("type" => k) } }.flatten.sort_by { |entity| entity["indices"].first }
+
+ result = ""
+ last_index = entities.inject(0) do |last_index_, entity|
+ result << text[last_index_...entity["indices"].first]
+ case entity["type"]
+ when "urls", "media"
+ result << "<url:#{escape_colon.call(entity["expanded_url"])}:#{escape_colon.call(entity["display_url"])}>"
+ when "hashtags"
+ result << "<hashtag:#{entity["text"]}>"
+ when "user_mentions"
+ result << "<mention:#{entity["screen_name"]}>"
+ when "symbols"
+ result << "<symbol:#{entity["text"]}>"
+ end
+
+ entity["indices"].last
+ end
+ result << text[last_index..-1]
+
+ result
+ end
+
def self.parse_condition(token, strings)
tweets = Tweet.arel_table
escape_text = -> str do
diff --git a/app/models/user.rb b/app/models/user.rb
index 983273e..80b4a9d 100644
--- a/app/models/user.rb
+++ b/app/models/user.rb
@@ -30,7 +30,7 @@ class User < ActiveRecord::Base
if att["screen_name"] == user.screen_name &&
att["name"] == user.name &&
- att["profile_image_url"][-44..-1] == user.profile_image_url[-44..-1] &&
+ att["profile_image_url"] == user.profile_image_url &&
att["protected"] == user.protected?
logger.debug("User not changed: #{user.id}")
else
diff --git a/collector/Gemfile.lock b/collector/Gemfile.lock
index 09da2d0..a6eb046 100644
--- a/collector/Gemfile.lock
+++ b/collector/Gemfile.lock
@@ -1,16 +1,16 @@
GEM
remote: https://rubygems.org/
specs:
- em-twitter (0.3.1)
+ em-twitter (0.3.2)
eventmachine (~> 1.0)
- http_parser.rb (>= 0.6.0.beta.2, < 0.7)
+ http_parser.rb (~> 0.6)
simple_oauth (~> 0.2)
eventmachine (1.0.3)
- http_parser.rb (0.6.0.beta.2)
- msgpack (0.5.7)
+ http_parser.rb (0.6.0)
+ msgpack (0.5.8)
settingslogic (2.0.9)
simple_oauth (0.2.0)
- yajl-ruby (1.1.0)
+ yajl-ruby (1.2.0)
PLATFORMS
ruby
diff --git a/collector/connection.rb b/collector/connection.rb
index 9af19fe..f73fbbd 100644
--- a/collector/connection.rb
+++ b/collector/connection.rb
@@ -1,83 +1,89 @@
-# -*- coding: utf-8 -*-
require "msgpack"
require "./settings"
-require "./stream"
+require "./user_stream"
-module Aclog
- module Collector
- class Connection < EM::Connection
- def initialize(logger)
- @logger = logger
- @clients = {}
- @pac = MessagePack::Unpacker.new
- @connected = false
- end
+module Aclog::Collector
+ class Connection < EM::Connection
+ def initialize(logger)
+ @logger = logger
+ @clients = {}
+ @unpacker = MessagePack::Unpacker.new
+ @reconnect = true
+ end
- def post_init
- send_object(type: "init",
- secret_key: Settings.secret_key)
- end
+ def post_init
+ send_object(type: "auth",
+ secret_key: Settings.secret_key)
+ end
+
+ def unbind
+ if @reconnect
+ log(:info, "reconnecting...")
- def unbind
- @logger.info("Connection closed") if @connected
EM.add_timer(10) do
- @connected = false
reconnect(Settings.receiver_host, Settings.receiver_port)
post_init
end
end
+ end
- # Server
- def receive_data(data)
- @pac.feed_each(data) do |msg|
- if not msg.is_a?(Hash) or not msg["type"]
- @logger.warn("Unknown data: #{msg}")
- close_connection_after_writing
- return
- end
+ def receive_data(data)
+ @unpacker.feed_each(data) do |msg|
+ unless msg.is_a?(Hash) && msg["type"]
+ log(:warn, "unknown data: #{msg}")
+ @reconnect = false
+ close_connection
+ return
+ end
- case msg["type"]
- when "ok"
- @connected = true
- @logger.info("Connected with server")
- when "error"
- @logger.info("error: #{msg["message"]}")
- when "fatal"
- @logger.info("fatal: #{msg["message"]}")
- close_connection
- when "bye"
- close_connection
- when "account"
- account_id = msg["id"]
- if not @clients[account_id]
- user_connection = Aclog::Collector::Stream.new(@logger, method(:send_object), msg)
- user_connection.start
- @clients[account_id] = user_connection
- else
- @clients[account_id].update(msg)
- end
- when "stop"
- account_id = msg["id"]
- if @clients[account_id]
- @clients[account_id].stop
- @clients.delete(account_id)
- @logger.info("Received account stop")
- end
+ case msg["type"]
+ when "ok"
+ log(:info, "connection established")
+ when "error"
+ log(:error, "error: #{msg}")
+ when "fatal"
+ log(:fatal, "fatal: #{msg}")
+ @reconnect = false
+ close_connection
+ when "account"
+ account_id = msg["id"]
+ if @clients[account_id]
+ @clients[account_id].update(msg)
else
- @logger.info("Unknown message: #{msg}")
+ stream = UserStream.new(@logger, msg) do |event, data|
+ send_object(data.merge(type: event))
+ end
+ stream.start
+ @clients[account_id] = stream
+ log(:info, "registered: #{account_id}")
+ end
+ when "stop"
+ account_id = msg["id"]
+ client = @clients[account_id]
+ if client
+ client.stop
+ @clients.delete(account_id)
+ log(:info, "unregistered: #{account_id}")
end
+ else
+ log(:warn, "unknown message: #{msg}")
end
end
+ end
- def quit
- send_object(type: "quit", reason: "stop")
- @clients.values.map(&:stop)
- end
+ def quit
+ @reconnect = false
+ send_object(type: "quit", reason: "stop")
+ @clients.values.each(&:stop)
+ end
- private
- def send_object(data)
- send_data(data.to_msgpack)
- end
+ private
+ def send_object(data)
+ send_data(data.to_msgpack)
+ end
+
+ def log(level, message)
+ @logger.__send__(level, "[WORKER] #{message}")
end
end
end
diff --git a/collector/helper.rb b/collector/helper.rb
deleted file mode 100644
index 96903cc..0000000
--- a/collector/helper.rb
+++ /dev/null
@@ -1,46 +0,0 @@
-# -*- encoding: utf-8 -*-
-
-module Aclog
- module Collector
- module Helper
- def format_text(status)
- text = status[:text]
- entities = status[:entities].map{|k, v| v.map{|n| n.update(type: k)}}.flatten.sort_by{|entity| entity[:indices].first}
-
- result = ""
- last_index = entities.inject(0) do |last_index_, entity|
- result << text[last_index_...entity[:indices].first]
- case entity[:type]
- when :urls, :media
- result << "<url:#{escape_colon(entity[:expanded_url])}:#{escape_colon(entity[:display_url])}>"
- when :hashtags
- result << "<hashtag:#{entity[:text]}>"
- when :user_mentions
- result << "<mention:#{entity[:screen_name]}>"
- when :symbols
- result << "<symbol:#{entity[:text]}>"
- end
-
- entity[:indices].last
- end
- result << text[last_index..-1]
-
- result
- end
-
- def format_source(status)
- if status[:source].include?("<a")
- url, name = status[:source].scan(/<a href="(.+?)" rel="nofollow">(.+?)<\/a>/).flatten
- "<url:#{escape_colon(url)}:#{escape_colon(name)}>"
- else
- # web, txt, ..
- status[:source]
- end
- end
-
- private
- # escape ":" to "\\:". "\\" is in neither Unreserved Characters nor Reserved Characters (RFC3986)
- def escape_colon(str); str.gsub(":", "\\:") end
- end
- end
-end
diff --git a/collector/settings.rb b/collector/settings.rb
index 71c8c28..e1d0d9e 100644
--- a/collector/settings.rb
+++ b/collector/settings.rb
@@ -1,10 +1,15 @@
require "settingslogic"
-class Settings < Settingslogic
- def self.env
- ENV["RAILS_ENV"] || "development"
- end
+module Aclog
+ module Collector
+ class Settings < Settingslogic
+ def self.env
+ ENV["RAILS_ENV"] || "development"
+ end
- source "settings.yml"
- namespace env
+ source "settings.yml"
+ namespace env
+ end
+ end
end
+
diff --git a/collector/settings.yml.example b/collector/settings.yml.example
index 6100b3b..356e6c3 100644
--- a/collector/settings.yml.example
+++ b/collector/settings.yml.example
@@ -1,9 +1,7 @@
defaults: &defaults
consumer:
- - key: "consumer key for collector"
- secret: "consumer secret"
- - key: "second consumer key"
- secret: "consumer secret"
+ key: "consumer key for collector"
+ secret: "consumer secret"
secret_key: "secret key"
development:
<<: *defaults
diff --git a/collector/start.rb b/collector/start.rb
index 33c0827..c4e3024 100755
--- a/collector/start.rb
+++ b/collector/start.rb
@@ -1,9 +1,13 @@
#!/usr/bin/env ruby
require "./worker"
+require "./settings"
$stdout.sync = true
$stderr.sync = true
-worker = Aclog::Collector::Worker.new
+logger = Logger.new(STDOUT)
+logger.level = Aclog::Collector::Settings.env == "development" ? Logger::DEBUG : Logger::INFO
+
+worker = Aclog::Collector::Worker.new(logger)
worker.start
diff --git a/collector/stream.rb b/collector/stream.rb
deleted file mode 100644
index 239c9f5..0000000
--- a/collector/stream.rb
+++ /dev/null
@@ -1,182 +0,0 @@
-# -*- coding: utf-8 -*-
-require "em-twitter"
-require "yajl"
-require "./settings"
-require "./helper"
-
-module Aclog
- module Collector
- class Stream
- include Aclog::Collector::Helper
- attr_reader :client
- attr_accessor :logger
-
- def initialize(logger, callback, hash)
- @logger = logger
- @user_id = hash["user_id"]
- @account_id = hash["id"]
- @callback = callback
- ready_client(hash)
- end
-
- def ready_client(hash)
- @client = EM::Twitter::Client.new(gopts(hash))
- @client.on_error {|message|
- log(:error, "Unknown Error", message) }
- @client.on_enhance_your_calm {
- log(:warn, "Enhance your calm") }
- @client.on_no_data_received {
- log(:warn, "No data received") }
- @client.on_reconnect {|timeout, retries|
- log(:warn, "Reconnected", retries) }
- @client.on_max_reconnects {|timeout, retries|
- @client.connection.stop
- log(:warn, "Reached max reconnects", retries) }
- @client.on_unauthorized {
- log(:warn, "Unauthorized")
- @client.connection.stop
- @callback.call(type: "unauthorized", user_id: @user_id, id: @account_id) }
- @client.on_service_unavailable {
- # account deleted?
- log(:warn, "Service Unavailable")
- @client.connection.stop }
-
- @client.each do |chunk|
- begin
- hash = Yajl::Parser.parse(chunk, :symbolize_keys => true)
- rescue Yajl::ParseError
- log(:warn, "Unexpected chunk", chunk)
- next
- end
-
- if hash[:warning]
- log(:info, "Stall warning", hash[:warning])
- elsif hash[:delete]
- if d = hash[:delete][:status]
- send_delete(d[:id], d[:user_id])
- end
- elsif hash[:limit]
- log(:warn, "UserStreams Limit", hash[:limit][:track])
- elsif hash[:event]
- # event
- case hash[:event]
- when "favorite"
- send_favorite(hash[:source], hash[:target], hash[:target_object])
- when "unfavorite"
- send_unfavorite(hash[:source], hash[:target], hash[:target_object])
- end
- elsif hash[:user]
- # tweet
- if hash[:retweeted_status]
- if hash[:retweeted_status][:user][:id] == @user_id ||
- hash[:user][:id] == @user_id
- send_retweet(hash)
- end
- elsif hash[:user][:id] == @user_id
- send_tweet(hash)
- end
- elsif hash[:friends]
- # maybe first message
- log(:debug, "Received friends", hash[:friends].size)
- elsif hash[:scrub_geo]
- log(:debug, "scrub_geo", hash)
- else
- log(:info, "Unexpected UserStreams data", hash)
- end
- end
- @client
- end
-
- def start
- @client.connect
- log(:info, "Connected")
- end
-
- def update(hash)
- opts = gopts(hash)
- if opts[:oauth][:token] != @client.options[:oauth][:token]
- @client.connection.update(opts)
- log(:info, "Connection updated")
- end
- end
-
- def stop
- @client.connection.stop
- log(:info, "Disconnected")
- end
-
- private
- def log(level, msg, data = nil)
- @logger.send(level, "#{msg}(##{@account_id}/#{@user_id}): #{data}")
- end
-
- def gopts(msg)
- {
- host: "userstream.twitter.com",
- path: "/1.1/user.json",
- params: {
- with: "user"
- },
- oauth: {
- consumer_key: Settings.consumer[msg["consumer_version"]].key,
- consumer_secret: Settings.consumer[msg["consumer_version"]].secret,
- token: msg["oauth_token"],
- token_secret: msg["oauth_token_secret"]},
- method: :get
- }
- end
-
- def conv_user(user)
- {id: user[:id],
- screen_name: user[:screen_name],
- name: user[:name],
- profile_image_url: user[:profile_image_url_https],
- protected: user[:protected]}
- end
-
- def conv_tweet(status)
- {type: "tweet",
- id: status[:id],
- text: format_text(status),
- source: format_source(status),
- tweeted_at: status[:created_at],
- in_reply_to_status_id: status[:in_reply_to_status_id],
- user: conv_user(status[:user])}
- end
-
- def send_tweet(status)
- @callback.call(conv_tweet(status))
- log(:debug, "Sent tweet", status[:id])
- end
-
- def send_favorite(source, target, target_object)
- @callback.call(type: "favorite",
- tweet: conv_tweet(target_object),
- user: conv_user(source))
- log(:debug, "Sent favorite", source[:id] => target_object[:id])
- end
-
- def send_unfavorite(source, target, target_object)
- @callback.call(type: "unfavorite",
- tweet: conv_tweet(target_object),
- user: conv_user(source))
- log(:debug, "Sent unfavorite", source[:id] => target_object[:id])
- end
-
- def send_retweet(status)
- @callback.call(type: "retweet",
- id: status[:id],
- tweet: conv_tweet(status[:retweeted_status]),
- user: conv_user(status[:user]))
- log(:debug, "Sent retweet", status[:user][:id] => status[:retweeted_status][:id])
- end
-
- def send_delete(deleted_status_id, deleted_user_id)
- @callback.call(type: "delete",
- id: deleted_status_id,
- user_id: deleted_user_id)
- log(:debug, "Sent delete", deleted_user_id => deleted_status_id)
- end
- end
- end
-end
diff --git a/collector/user_stream.rb b/collector/user_stream.rb
new file mode 100644
index 0000000..07951a4
--- /dev/null
+++ b/collector/user_stream.rb
@@ -0,0 +1,165 @@
+require "em-twitter"
+require "yajl"
+require "./settings"
+
+module Aclog
+ module Collector
+ class UserStream
+ def initialize(logger, msg, &blk)
+ @logger = logger
+ @user_id = msg["user_id"]
+ @account_id = msg["id"]
+ @callback = blk
+ prepare_client(msg)
+ end
+
+ def callback(event, data)
+ @callback.call(event, data)
+ end
+
+ def prepare_client(msg)
+ client = EM::Twitter::Client.new(make_opts(msg))
+ client.on_error do |message|
+ log(:error, "Unknown error: #{message}")
+ end
+
+ client.on_enhance_your_calm do
+ log(:warn, "Enhance your calm")
+ end
+
+ client.on_no_data_received do
+ log(:warn, "No data received")
+ end
+
+ client.on_close do
+ log(:info, "disconnect")
+ end
+
+ client.on_reconnect do |timeout, retries|
+ log(:info, "Reconnected: #{retries}")
+ end
+
+ client.on_max_reconnects do |timeout, retries|
+ log(:warn, "Reached max reconnects: #{retries}")
+ stop
+ end
+
+ client.on_unauthorized do
+ log(:warn, "Unauthorized")
+ callback(:unauthorized, id: @account_id)
+ stop
+ end
+
+ client.on_service_unavailable do
+ # account deleted?
+ log(:warn, "Service unavailable")
+ stop
+ end
+
+ client.each do |item|
+ begin
+ hash = Yajl::Parser.parse(item, symbolize_keys: true)
+ rescue Yajl::ParseError
+ log(:warn, "JSON parse error: #{item}")
+ next
+ end
+
+ if hash[:warning]
+ log(:warn, "warning: #{hash[:warning]}")
+ elsif hash[:limit]
+ log(:warn, "limit: #{hash[:limit][:track]}")
+ elsif hash[:delete]
+ if d = hash[:delete][:status]
+ log(:debug, "delete: #{hash[:delete][:status]}")
+ callback(:delete, d)
+ end
+ elsif hash[:event]
+ case hash[:event]
+ when "favorite"
+ log(:debug, "favorite: #{hash[:source][:id]} => #{hash[:target_object][:id]}")
+ callback(:favorite,
+ source: reduce_user(hash[:source]),
+ target_object: reduce_tweet(hash[:target_object]))
+ when "unfavorite"
+ log(:debug, "unfavorite: #{hash[:source][:id]} => #{hash[:target_object][:id]}")
+ callback(:unfavorite,
+ source: reduce_user(hash[:source]),
+ target_object: reduce_tweet(hash[:target_object]))
+ end
+ elsif hash[:user]
+ if hash[:retweeted_status]
+ if hash[:retweeted_status][:user][:id] == @user_id || hash[:user][:id] == @user_id
+ log(:debug, "retweet: #{hash[:user][:id]} => #{hash[:retweeted_status][:id]}")
+ callback(:retweet,
+ id: hash[:id],
+ user: reduce_user(hash[:user]),
+ retweeted_status: reduce_tweet(hash[:retweeted_status]))
+ end
+ elsif hash[:user][:id] == @user_id
+ log(:debug, "tweet: #{hash[:user][:id]} => #{hash[:id]}")
+ callback(:tweet, reduce_tweet(hash))
+ end
+ elsif hash[:friends]
+ log(:debug, "friends: #{hash[:friends].size}")
+ elsif hash[:scrub_geo]
+ log(:debug, "scrub_geo: #{hash}")
+ else
+ log(:info, "Unknown streaming data: #{hash}")
+ end
+ end
+ @client = client
+ end
+
+ def start
+ @client.connect
+ log(:info, "Connected")
+ end
+
+ def update(hash)
+ opts = make_opts(hash)
+ if opts[:oauth][:token] != @client.options[:oauth][:token]
+ log(:info, "update")
+ @client.connection.update(opts)
+ end
+ end
+
+ def stop
+ @client.connection.stop
+ end
+
+ private
+ def log(level, message)
+ @logger.send(level, "[USERSTREAM:##{@account_id}:#{@user_id}] #{message}")
+ end
+
+ def make_opts(msg)
+ { host: "userstream.twitter.com",
+ path: "/1.1/user.json",
+ params: { with: "user" },
+ oauth: { consumer_key: msg["consumer_key"],
+ consumer_secret: msg["consumer_secret"],
+ token: msg["oauth_token"],
+ token_secret: msg["oauth_token_secret"] },
+ method: :get }
+ end
+
+ def reduce_user(user)
+ { id: user[:id],
+ screen_name: user[:screen_name],
+ name: user[:name],
+ profile_image_url: user[:profile_image_url_https],
+ protected: user[:protected] }
+ end
+
+ def reduce_tweet(status)
+ { id: status[:id],
+ text: status[:text],
+ entities: status[:entities],
+ source: status[:source],
+ created_at: status[:created_at],
+ in_reply_to_status_id: status[:in_reply_to_status_id],
+ user: reduce_user(status[:user]) }
+ end
+ end
+ end
+end
diff --git a/collector/worker.rb b/collector/worker.rb
index 6c901d9..88fa05e 100644
--- a/collector/worker.rb
+++ b/collector/worker.rb
@@ -1,21 +1,18 @@
-# -*- coding: utf-8 -*-
-require "logger"
+require "./settings"
require "./connection"
module Aclog
module Collector
class Worker
- def initialize
- @logger = Logger.new(STDOUT)
- @logger.level = Settings.env == "development" ? Logger::DEBUG : Logger::INFO
+ def initialize(logger)
+ @logger = logger
end
def start
EM.run do
- connection = EM.connect(Settings.receiver_host, Settings.receiver_port, Aclog::Collector::Connection, @logger)
+ connection = EM.connect(Settings.receiver_host, Settings.receiver_port, Connection, @logger)
stop = proc do
- @logger.info("Quitting collector...")
connection.quit
EM.stop
end
diff --git a/lib/aclog/receiver/collector_connection.rb b/lib/aclog/receiver/collector_connection.rb
index 36f652f..9f43f00 100644
--- a/lib/aclog/receiver/collector_connection.rb
+++ b/lib/aclog/receiver/collector_connection.rb
@@ -1,166 +1,141 @@
-# -*- coding: utf-8 -*-
-require "time"
-
module Aclog
module Receiver
class CollectorConnection < EM::Connection
- def initialize(connections)
+ def initialize(channel, connections)
+ @channel = channel
@connections = connections
@worker_number = nil
- @pac = MessagePack::Unpacker.new
-
- unless defined? @@queue
- @@queue = EM::Queue.new
-
- _cr = -> bl { bl.call; @@queue.pop(&_cr) }
- EM.defer { @@queue.pop(&_cr) }
- end
+ @unpacker = MessagePack::Unpacker.new
end
def send_account(account)
send_object(type: "account",
id: account.id,
+ consumer_key: Settings.consumer.key,
+ consumer_secret: Settings.consumer.secret,
oauth_token: account.oauth_token,
oauth_token_secret: account.oauth_token_secret,
user_id: account.user_id)
- Rails.logger.debug("Sent #{account.id}/#{account.user_id}")
+ log(:debug, "send: #{account.id}/#{account.user_id}")
end
def send_stop_account(account)
send_object(type: "stop",
id: account.id)
- Rails.logger.debug("Sent Stop #{account.id}/#{account.user_id}")
+ log(:debug, "send stop: #{account.id}/#{account.user_id}")
end
def post_init
- # なにもしない。クライアントが
end
def unbind
@connections.reject! {|k, v| v == self }
- Rails.logger.info("Connection closed(#{@worker_number})")
+ log(:info, "connection closed")
end
def receive_data(data)
- @pac.feed_each(data) do |msg|
+ @unpacker.feed_each(data) do |msg|
unless msg.is_a?(Hash) && msg["type"]
- Rails.logger.warn("Unknown data: #{msg}")
- send_object(type: "fatal", message: "Unknown data")
+ log(:error, "unknown data: #{msg}")
+ send_object(type: "fatal", message: "unknown data")
close_connection_after_writing
return
end
- if not @authorized and not msg["type"] == "init"
- Rails.logger.warn("Not authorized client: #{msg}")
- send_object(type: "fatal", message: "You aren't authorized")
- close_connection_after_writing
+ unless @authorized
+ if msg["type"] == "auth"
+ auth(msg)
+ else
+ log(:warn, "not authorized client: #{msg}")
+ send_object(type: "fatal", message: "You aren't authorized")
+ close_connection_after_writing
+ end
return
end
case msg["type"]
- when "init"
- receive_init(msg)
when "unauthorized"
- receive_unauthorized(msg)
+ @channel << -> {
+ log(:warn, "unauthorized: #{msg["user_id"]}")
+ }
when "tweet"
- receive_tweet(msg)
+ @channel << -> {
+ log(:debug, "receive tweet: #{msg["id"]}")
+ Tweet.from_receiver(msg)
+ }
when "favorite"
- receive_favorite(msg)
+ @channel << -> {
+ log(:debug, "receive favorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}")
+ if f = Favorite.from_receiver(msg)
+ f.tweet.notify_favorite
+ end
+ }
when "unfavorite"
- receive_unfavorite(msg)
+ @channel << -> {
+ log(:debug, "receive unfavorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}")
+ Favorite.delete_from_receiver(msg)
+ }
when "retweet"
- receive_retweet(msg)
+ @channel << -> {
+ log(:debug, "receive retweet: #{msg["user"]["id"]} => #{msg["retweeted_status"]["id"]}")
+ Retweet.from_receiver(msg)
+ }
when "delete"
- receive_delete(msg)
+ @channel << -> {
+ log(:debug, "receive delete: #{msg["id"]}")
+ Tweet.delete_from_receiver(msg)
+ }
when "quit"
- Rails.logger.info("Quit(#{@worker_number}): #{msg["reason"]}")
+ log(:info, "receive quit: #{msg["reason"]}")
send_data(type: "quit", message: "Bye")
close_connection_after_writing
else
- Rails.logger.warn("Unknown message type(#{@worker_number}): #{msg["type"]}")
- send_object(type: "error", message: "Unknown message type: #{msg["type"]}")
+ log(:warn, "unknown message: #{msg["type"]}")
+ send_object(type: "error", message: "Unknown message type")
end
end
end
private
+ def log(level, message)
+ text = "[RECEIVER"
+ text << ":#{@worker_number}" if @worker_number
+ text << "] #{message}"
+ Rails.logger.__send__(level, text)
+ end
+
def send_object(data)
send_data(data.to_msgpack)
end
- def receive_init(msg)
+ def auth(msg)
secret_key = msg["secret_key"]
unless secret_key == Settings.collector.secret_key
- Rails.logger.warn("Invalid secret_key: \"#{secret_key}\"")
- send_object(type: "fatal", message: "Invalid secret_key")
+ log(:warn, "Invalid secret_key: \"#{secret_key}\"")
+ send_object(type: "fatal", message: "invalid secret_key")
close_connection_after_writing
return
end
- worker_number = Settings.collector.count.times.find {|num| !@connections.key?(num) }
+ worker_number = (Settings.collector.count.times.to_a - @connections.keys).sort.first
if worker_number == nil
- Rails.logger.warn("Invalid worker_number: #{worker_number}")
- send_object(type: "fatal", message: "Invalid worker_number")
+ log(:warn, "all connection alive")
+ send_object(type: "fatal", message: "all connection alive")
close_connection_after_writing
return
end
- if @connections[worker_number]
- @connections[worker_number].close_connection
- end
@connections[worker_number] = self
@worker_number = worker_number
@authorized = true
- Rails.logger.info("Connected(#{@worker_number})")
- send_object(type: "ok", message: "Connected")
+ log(:info, "connect")
+ send_object(type: "ok", message: "connected")
Account.set_of_collector(@worker_number).each do |account|
send_account(account)
end
end
-
- def receive_unauthorized(msg)
- Rails.logger.warn("Unauthorized(#{@worker_number}): #{msg["user_id"]}")
- # unregister
- end
-
- def receive_tweet(msg)
- @@queue.push -> do
- Rails.logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}")
- Tweet.from_receiver(msg)
- end
- end
-
- def receive_favorite(msg)
- @@queue.push -> do
- Rails.logger.debug("Receive Favorite(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}")
- if f = Favorite.from_receiver(msg)
- f.tweet.notify_favorite
- end
- end
- end
-
- def receive_unfavorite(msg)
- @@queue.push -> do
- Rails.logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}")
- Favorite.delete_from_receiver(msg)
- end
- end
-
- def receive_retweet(msg)
- @@queue.push -> do
- Rails.logger.debug("Receive Retweet(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}")
- Retweet.from_receiver(msg)
- end
- end
-
- def receive_delete(msg)
- @@queue.push -> do
- Rails.logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}")
- Tweet.delete_from_receiver(msg)
- end
- end
end
end
end
diff --git a/lib/aclog/receiver/register_server.rb b/lib/aclog/receiver/register_server.rb
index 541efd5..f5b6060 100644
--- a/lib/aclog/receiver/register_server.rb
+++ b/lib/aclog/receiver/register_server.rb
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
module Aclog
module Receiver
class RegisterServer
diff --git a/lib/aclog/receiver/worker.rb b/lib/aclog/receiver/worker.rb
index 9896853..db60951 100644
--- a/lib/aclog/receiver/worker.rb
+++ b/lib/aclog/receiver/worker.rb
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
require "msgpack/rpc/transport/unix"
module Aclog
@@ -17,9 +16,12 @@ module Aclog
Rails.logger.info("Receiver started")
File.delete(_sock_path) if File.exists?(_sock_path)
EM.run do
+ channel = EM::Channel.new
+ EM.defer { channel.subscribe(&:call) }
+
connections = {}
- collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, connections)
+ collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, channel, connections)
reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(_sock_path)
register_server = MessagePack::RPC::Server.new