diff options
author | Rhenium <rhenium@rhe.jp> | 2013-12-20 21:42:05 +0900 |
---|---|---|
committer | Rhenium <rhenium@rhe.jp> | 2013-12-20 21:42:05 +0900 |
commit | 4b56cd6b67d890c5e2ed763656a7fd7832b10d8f (patch) | |
tree | eb394e9c187b8bbc60b54d16b72200edbf9380ab | |
parent | 376c50784c2c740c6caacd1e94550e3d9d77fe41 (diff) | |
download | aclog-4b56cd6b67d890c5e2ed763656a7fd7832b10d8f.tar.gz |
update workers
-rw-r--r-- | app/models/favorite.rb | 12 | ||||
-rw-r--r-- | app/models/retweet.rb | 8 | ||||
-rw-r--r-- | app/models/tweet.rb | 39 | ||||
-rw-r--r-- | app/models/user.rb | 2 | ||||
-rw-r--r-- | collector/Gemfile.lock | 10 | ||||
-rw-r--r-- | collector/connection.rb | 130 | ||||
-rw-r--r-- | collector/helper.rb | 46 | ||||
-rw-r--r-- | collector/settings.rb | 17 | ||||
-rw-r--r-- | collector/settings.yml.example | 6 | ||||
-rwxr-xr-x | collector/start.rb | 6 | ||||
-rw-r--r-- | collector/stream.rb | 182 | ||||
-rw-r--r-- | collector/user_stream.rb | 165 | ||||
-rw-r--r-- | collector/worker.rb | 11 | ||||
-rw-r--r-- | lib/aclog/receiver/collector_connection.rb | 149 | ||||
-rw-r--r-- | lib/aclog/receiver/register_server.rb | 1 | ||||
-rw-r--r-- | lib/aclog/receiver/worker.rb | 6 |
16 files changed, 368 insertions, 422 deletions
diff --git a/app/models/favorite.rb b/app/models/favorite.rb index 44a8ae4..08ee580 100644 --- a/app/models/favorite.rb +++ b/app/models/favorite.rb @@ -12,14 +12,14 @@ class Favorite < ActiveRecord::Base def self.from_receiver(msg) transaction do - t = Tweet.from_receiver(msg["tweet"]) - u = User.from_receiver(msg["user"]) - f = logger.quietly { t.favorites.create!(user: u) } - logger.debug("Created Favorite: #{msg["user"]["id"]} => #{msg["tweet"]["id"]}") + t = Tweet.from_receiver(msg["target_object"]) + u = User.from_receiver(msg["source"]) + f = t.favorites.create!(user: u) + logger.debug("Created Favorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}") return f end rescue ActiveRecord::RecordNotUnique - logger.debug("Duplicate Favorite: #{msg["user"]["id"]} => #{msg["tweet"]["id"]}") + logger.debug("Duplicate Favorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}") return nil rescue => e logger.error("Unknown error while inserting favorite: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}") @@ -27,6 +27,6 @@ class Favorite < ActiveRecord::Base end def self.delete_from_receiver(msg) - where(tweet_id: msg["tweet"]["id"], user_id: msg["user"]["id"]).destroy_all + where(tweet_id: msg["target_object"]["id"], user_id: msg["source"]["id"]).destroy_all end end diff --git a/app/models/retweet.rb b/app/models/retweet.rb index db937cf..807ca1c 100644 --- a/app/models/retweet.rb +++ b/app/models/retweet.rb @@ -12,14 +12,14 @@ class Retweet < ActiveRecord::Base def self.from_receiver(msg) transaction do - t = Tweet.from_receiver(msg["tweet"]) + t = Tweet.from_receiver(msg["retweeted_status"]) u = User.from_receiver(msg["user"]) - r = logger.quietly { t.retweets.create!(id: msg["id"], user: u) } - logger.debug("Created Retweet: #{msg["id"]}: #{msg["user"]["id"]} => #{msg["tweet"]["id"]}") + r = t.retweets.create!(id: msg["id"], user: u) + logger.debug("Created Retweet: #{msg["id"]}: #{msg["user"]["id"]} => #{msg["retweeted_status"]["id"]}") return r end rescue ActiveRecord::RecordNotUnique - logger.debug("Duplicate Retweet: #{msg["id"]}: #{msg["user"]["id"]} => #{msg["tweet"]["id"]}") + logger.debug("Duplicate Retweet: #{msg["id"]}: #{msg["user"]["id"]} => #{msg["retweeted_status"]["id"]}") return nil rescue => e logger.error("Unknown error while inserting retweet: #{e.class}: #{e.message}/#{e.backtrace.join("\n")}") diff --git a/app/models/tweet.rb b/app/models/tweet.rb index 78adb2e..2642d3b 100644 --- a/app/models/tweet.rb +++ b/app/models/tweet.rb @@ -86,14 +86,12 @@ class Tweet < ActiveRecord::Base unless t begin u = User.from_receiver(msg["user"]) - t = logger.quietly do - self.create!(id: msg["id"], - text: msg["text"], - source: msg["source"], - tweeted_at: Time.parse(msg["tweeted_at"]), - in_reply_to_id: msg["in_reply_to_status_id"], - user: u) - end + t = self.create!(id: msg["id"], + text: extract_entities(msg["text"], msg["entities"]), + source: msg["source"], + tweeted_at: Time.parse(msg["created_at"]), + in_reply_to_id: msg["in_reply_to_status_id"], + user: u) logger.debug("Created Tweet: #{msg["id"]}") rescue ActiveRecord::RecordNotUnique logger.debug("Duplicate Tweet: #{msg["id"]}") @@ -129,6 +127,31 @@ class Tweet < ActiveRecord::Base end private + def self.extract_entities(text, entities) + escape_colon = -> str { str.gsub(":", "\\:") } + entities = entities.map { |k, v| v.map { |n| n.update("type" => k) } }.flatten.sort_by { |entity| entity["indices"].first } + + result = "" + last_index = entities.inject(0) do |last_index_, entity| + result << text[last_index_...entity["indices"].first] + case entity["type"] + when "urls", "media" + result << "<url:#{escape_colon.call(entity["expanded_url"])}:#{escape_colon.call(entity["display_url"])}>" + when "hashtags" + result << "<hashtag:#{entity["text"]}>" + when "user_mentions" + result << "<mention:#{entity["screen_name"]}>" + when "symbols" + result << "<symbol:#{entity["text"]}>" + end + + entity["indices"].last + end + result << text[last_index..-1] + + result + end + def self.parse_condition(token, strings) tweets = Tweet.arel_table escape_text = -> str do diff --git a/app/models/user.rb b/app/models/user.rb index 983273e..80b4a9d 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -30,7 +30,7 @@ class User < ActiveRecord::Base if att["screen_name"] == user.screen_name && att["name"] == user.name && - att["profile_image_url"][-44..-1] == user.profile_image_url[-44..-1] && + att["profile_image_url"] == user.profile_image_url && att["protected"] == user.protected? logger.debug("User not changed: #{user.id}") else diff --git a/collector/Gemfile.lock b/collector/Gemfile.lock index 09da2d0..a6eb046 100644 --- a/collector/Gemfile.lock +++ b/collector/Gemfile.lock @@ -1,16 +1,16 @@ GEM remote: https://rubygems.org/ specs: - em-twitter (0.3.1) + em-twitter (0.3.2) eventmachine (~> 1.0) - http_parser.rb (>= 0.6.0.beta.2, < 0.7) + http_parser.rb (~> 0.6) simple_oauth (~> 0.2) eventmachine (1.0.3) - http_parser.rb (0.6.0.beta.2) - msgpack (0.5.7) + http_parser.rb (0.6.0) + msgpack (0.5.8) settingslogic (2.0.9) simple_oauth (0.2.0) - yajl-ruby (1.1.0) + yajl-ruby (1.2.0) PLATFORMS ruby diff --git a/collector/connection.rb b/collector/connection.rb index 9af19fe..f73fbbd 100644 --- a/collector/connection.rb +++ b/collector/connection.rb @@ -1,83 +1,89 @@ -# -*- coding: utf-8 -*- require "msgpack" require "./settings" -require "./stream" +require "./user_stream" -module Aclog - module Collector - class Connection < EM::Connection - def initialize(logger) - @logger = logger - @clients = {} - @pac = MessagePack::Unpacker.new - @connected = false - end +module Aclog::Collector + class Connection < EM::Connection + def initialize(logger) + @logger = logger + @clients = {} + @unpacker = MessagePack::Unpacker.new + @reconnect = true + end - def post_init - send_object(type: "init", - secret_key: Settings.secret_key) - end + def post_init + send_object(type: "auth", + secret_key: Settings.secret_key) + end + + def unbind + if @reconnect + log(:info, "reconnecting...") - def unbind - @logger.info("Connection closed") if @connected EM.add_timer(10) do - @connected = false reconnect(Settings.receiver_host, Settings.receiver_port) post_init end end + end - # Server - def receive_data(data) - @pac.feed_each(data) do |msg| - if not msg.is_a?(Hash) or not msg["type"] - @logger.warn("Unknown data: #{msg}") - close_connection_after_writing - return - end + def receive_data(data) + @unpacker.feed_each(data) do |msg| + unless msg.is_a?(Hash) && msg["type"] + log(:warn, "unknown data: #{msg}") + @reconnect = false + close_connection + return + end - case msg["type"] - when "ok" - @connected = true - @logger.info("Connected with server") - when "error" - @logger.info("error: #{msg["message"]}") - when "fatal" - @logger.info("fatal: #{msg["message"]}") - close_connection - when "bye" - close_connection - when "account" - account_id = msg["id"] - if not @clients[account_id] - user_connection = Aclog::Collector::Stream.new(@logger, method(:send_object), msg) - user_connection.start - @clients[account_id] = user_connection - else - @clients[account_id].update(msg) - end - when "stop" - account_id = msg["id"] - if @clients[account_id] - @clients[account_id].stop - @clients.delete(account_id) - @logger.info("Received account stop") - end + case msg["type"] + when "ok" + log(:info, "connection established") + when "error" + log(:error, "error: #{msg}") + when "fatal" + log(:fatal, "fatal: #{msg}") + @reconnect = false + close_connection + when "account" + account_id = msg["id"] + if @clients[account_id] + @clients[account_id].update(msg) else - @logger.info("Unknown message: #{msg}") + stream = UserStream.new(@logger, msg) do |event, data| + send_object(data.merge(type: event)) + end + stream.start + @clients[account_id] = stream + log(:info, "registered: #{account_id}") + end + when "stop" + account_id = msg["id"] + client = @clients[account_id] + if client + client.stop + @clients.delete(account_id) + log(:info, "unregistered: #{account_id}") end + else + log(:warn, "unknown message: #{msg}") end end + end - def quit - send_object(type: "quit", reason: "stop") - @clients.values.map(&:stop) - end + def quit + @reconnect = false + send_object(type: "quit", reason: "stop") + @clients.values.each(&:stop) + end - private - def send_object(data) - send_data(data.to_msgpack) - end + private + def send_object(data) + send_data(data.to_msgpack) + end + + def log(level, message) + @logger.__send__(level, "[WORKER] #{message}") end end end diff --git a/collector/helper.rb b/collector/helper.rb deleted file mode 100644 index 96903cc..0000000 --- a/collector/helper.rb +++ /dev/null @@ -1,46 +0,0 @@ -# -*- encoding: utf-8 -*- - -module Aclog - module Collector - module Helper - def format_text(status) - text = status[:text] - entities = status[:entities].map{|k, v| v.map{|n| n.update(type: k)}}.flatten.sort_by{|entity| entity[:indices].first} - - result = "" - last_index = entities.inject(0) do |last_index_, entity| - result << text[last_index_...entity[:indices].first] - case entity[:type] - when :urls, :media - result << "<url:#{escape_colon(entity[:expanded_url])}:#{escape_colon(entity[:display_url])}>" - when :hashtags - result << "<hashtag:#{entity[:text]}>" - when :user_mentions - result << "<mention:#{entity[:screen_name]}>" - when :symbols - result << "<symbol:#{entity[:text]}>" - end - - entity[:indices].last - end - result << text[last_index..-1] - - result - end - - def format_source(status) - if status[:source].include?("<a") - url, name = status[:source].scan(/<a href="(.+?)" rel="nofollow">(.+?)<\/a>/).flatten - "<url:#{escape_colon(url)}:#{escape_colon(name)}>" - else - # web, txt, .. - status[:source] - end - end - - private - # escape ":" to "\\:". "\\" is in neither Unreserved Characters nor Reserved Characters (RFC3986) - def escape_colon(str); str.gsub(":", "\\:") end - end - end -end diff --git a/collector/settings.rb b/collector/settings.rb index 71c8c28..e1d0d9e 100644 --- a/collector/settings.rb +++ b/collector/settings.rb @@ -1,10 +1,15 @@ require "settingslogic" -class Settings < Settingslogic - def self.env - ENV["RAILS_ENV"] || "development" - end +module Aclog + module Collector + class Settings < Settingslogic + def self.env + ENV["RAILS_ENV"] || "development" + end - source "settings.yml" - namespace env + source "settings.yml" + namespace env + end + end end + diff --git a/collector/settings.yml.example b/collector/settings.yml.example index 6100b3b..356e6c3 100644 --- a/collector/settings.yml.example +++ b/collector/settings.yml.example @@ -1,9 +1,7 @@ defaults: &defaults consumer: - - key: "consumer key for collector" - secret: "consumer secret" - - key: "second consumer key" - secret: "consumer secret" + key: "consumer key for collector" + secret: "consumer secret" secret_key: "secret key" development: <<: *defaults diff --git a/collector/start.rb b/collector/start.rb index 33c0827..c4e3024 100755 --- a/collector/start.rb +++ b/collector/start.rb @@ -1,9 +1,13 @@ #!/usr/bin/env ruby require "./worker" +require "./settings" $stdout.sync = true $stderr.sync = true -worker = Aclog::Collector::Worker.new +logger = Logger.new(STDOUT) +logger.level = Aclog::Collector::Settings.env == "development" ? Logger::DEBUG : Logger::INFO + +worker = Aclog::Collector::Worker.new(logger) worker.start diff --git a/collector/stream.rb b/collector/stream.rb deleted file mode 100644 index 239c9f5..0000000 --- a/collector/stream.rb +++ /dev/null @@ -1,182 +0,0 @@ -# -*- coding: utf-8 -*- -require "em-twitter" -require "yajl" -require "./settings" -require "./helper" - -module Aclog - module Collector - class Stream - include Aclog::Collector::Helper - attr_reader :client - attr_accessor :logger - - def initialize(logger, callback, hash) - @logger = logger - @user_id = hash["user_id"] - @account_id = hash["id"] - @callback = callback - ready_client(hash) - end - - def ready_client(hash) - @client = EM::Twitter::Client.new(gopts(hash)) - @client.on_error {|message| - log(:error, "Unknown Error", message) } - @client.on_enhance_your_calm { - log(:warn, "Enhance your calm") } - @client.on_no_data_received { - log(:warn, "No data received") } - @client.on_reconnect {|timeout, retries| - log(:warn, "Reconnected", retries) } - @client.on_max_reconnects {|timeout, retries| - @client.connection.stop - log(:warn, "Reached max reconnects", retries) } - @client.on_unauthorized { - log(:warn, "Unauthorized") - @client.connection.stop - @callback.call(type: "unauthorized", user_id: @user_id, id: @account_id) } - @client.on_service_unavailable { - # account deleted? - log(:warn, "Service Unavailable") - @client.connection.stop } - - @client.each do |chunk| - begin - hash = Yajl::Parser.parse(chunk, :symbolize_keys => true) - rescue Yajl::ParseError - log(:warn, "Unexpected chunk", chunk) - next - end - - if hash[:warning] - log(:info, "Stall warning", hash[:warning]) - elsif hash[:delete] - if d = hash[:delete][:status] - send_delete(d[:id], d[:user_id]) - end - elsif hash[:limit] - log(:warn, "UserStreams Limit", hash[:limit][:track]) - elsif hash[:event] - # event - case hash[:event] - when "favorite" - send_favorite(hash[:source], hash[:target], hash[:target_object]) - when "unfavorite" - send_unfavorite(hash[:source], hash[:target], hash[:target_object]) - end - elsif hash[:user] - # tweet - if hash[:retweeted_status] - if hash[:retweeted_status][:user][:id] == @user_id || - hash[:user][:id] == @user_id - send_retweet(hash) - end - elsif hash[:user][:id] == @user_id - send_tweet(hash) - end - elsif hash[:friends] - # maybe first message - log(:debug, "Received friends", hash[:friends].size) - elsif hash[:scrub_geo] - log(:debug, "scrub_geo", hash) - else - log(:info, "Unexpected UserStreams data", hash) - end - end - @client - end - - def start - @client.connect - log(:info, "Connected") - end - - def update(hash) - opts = gopts(hash) - if opts[:oauth][:token] != @client.options[:oauth][:token] - @client.connection.update(opts) - log(:info, "Connection updated") - end - end - - def stop - @client.connection.stop - log(:info, "Disconnected") - end - - private - def log(level, msg, data = nil) - @logger.send(level, "#{msg}(##{@account_id}/#{@user_id}): #{data}") - end - - def gopts(msg) - { - host: "userstream.twitter.com", - path: "/1.1/user.json", - params: { - with: "user" - }, - oauth: { - consumer_key: Settings.consumer[msg["consumer_version"]].key, - consumer_secret: Settings.consumer[msg["consumer_version"]].secret, - token: msg["oauth_token"], - token_secret: msg["oauth_token_secret"]}, - method: :get - } - end - - def conv_user(user) - {id: user[:id], - screen_name: user[:screen_name], - name: user[:name], - profile_image_url: user[:profile_image_url_https], - protected: user[:protected]} - end - - def conv_tweet(status) - {type: "tweet", - id: status[:id], - text: format_text(status), - source: format_source(status), - tweeted_at: status[:created_at], - in_reply_to_status_id: status[:in_reply_to_status_id], - user: conv_user(status[:user])} - end - - def send_tweet(status) - @callback.call(conv_tweet(status)) - log(:debug, "Sent tweet", status[:id]) - end - - def send_favorite(source, target, target_object) - @callback.call(type: "favorite", - tweet: conv_tweet(target_object), - user: conv_user(source)) - log(:debug, "Sent favorite", source[:id] => target_object[:id]) - end - - def send_unfavorite(source, target, target_object) - @callback.call(type: "unfavorite", - tweet: conv_tweet(target_object), - user: conv_user(source)) - log(:debug, "Sent unfavorite", source[:id] => target_object[:id]) - end - - def send_retweet(status) - @callback.call(type: "retweet", - id: status[:id], - tweet: conv_tweet(status[:retweeted_status]), - user: conv_user(status[:user])) - log(:debug, "Sent retweet", status[:user][:id] => status[:retweeted_status][:id]) - end - - def send_delete(deleted_status_id, deleted_user_id) - @callback.call(type: "delete", - id: deleted_status_id, - user_id: deleted_user_id) - log(:debug, "Sent delete", deleted_user_id => deleted_status_id) - end - end - end -end diff --git a/collector/user_stream.rb b/collector/user_stream.rb new file mode 100644 index 0000000..07951a4 --- /dev/null +++ b/collector/user_stream.rb @@ -0,0 +1,165 @@ +require "em-twitter" +require "yajl" +require "./settings" + +module Aclog + module Collector + class UserStream + def initialize(logger, msg, &blk) + @logger = logger + @user_id = msg["user_id"] + @account_id = msg["id"] + @callback = blk + prepare_client(msg) + end + + def callback(event, data) + @callback.call(event, data) + end + + def prepare_client(msg) + client = EM::Twitter::Client.new(make_opts(msg)) + client.on_error do |message| + log(:error, "Unknown error: #{message}") + end + + client.on_enhance_your_calm do + log(:warn, "Enhance your calm") + end + + client.on_no_data_received do + log(:warn, "No data received") + end + + client.on_close do + log(:info, "disconnect") + end + + client.on_reconnect do |timeout, retries| + log(:info, "Reconnected: #{retries}") + end + + client.on_max_reconnects do |timeout, retries| + log(:warn, "Reached max reconnects: #{retries}") + stop + end + + client.on_unauthorized do + log(:warn, "Unauthorized") + callback(:unauthorized, id: @account_id) + stop + end + + client.on_service_unavailable do + # account deleted? + log(:warn, "Service unavailable") + stop + end + + client.each do |item| + begin + hash = Yajl::Parser.parse(item, symbolize_keys: true) + rescue Yajl::ParseError + log(:warn, "JSON parse error: #{item}") + next + end + + if hash[:warning] + log(:warn, "warning: #{hash[:warning]}") + elsif hash[:limit] + log(:warn, "limit: #{hash[:limit][:track]}") + elsif hash[:delete] + if d = hash[:delete][:status] + log(:debug, "delete: #{hash[:delete][:status]}") + callback(:delete, d) + end + elsif hash[:event] + case hash[:event] + when "favorite" + log(:debug, "favorite: #{hash[:source][:id]} => #{hash[:target_object][:id]}") + callback(:favorite, + source: reduce_user(hash[:source]), + target_object: reduce_tweet(hash[:target_object])) + when "unfavorite" + log(:debug, "unfavorite: #{hash[:source][:id]} => #{hash[:target_object][:id]}") + callback(:unfavorite, + source: reduce_user(hash[:source]), + target_object: reduce_tweet(hash[:target_object])) + end + elsif hash[:user] + if hash[:retweeted_status] + if hash[:retweeted_status][:user][:id] == @user_id || hash[:user][:id] == @user_id + log(:debug, "retweet: #{hash[:user][:id]} => #{hash[:retweeted_status][:id]}") + callback(:retweet, + id: hash[:id], + user: reduce_user(hash[:user]), + retweeted_status: reduce_tweet(hash[:retweeted_status])) + end + elsif hash[:user][:id] == @user_id + log(:debug, "tweet: #{hash[:user][:id]} => #{hash[:id]}") + callback(:tweet, reduce_tweet(hash)) + end + elsif hash[:friends] + log(:debug, "friends: #{hash[:friends].size}") + elsif hash[:scrub_geo] + log(:debug, "scrub_geo: #{hash}") + else + log(:info, "Unknown streaming data: #{hash}") + end + end + @client = client + end + + def start + @client.connect + log(:info, "Connected") + end + + def update(hash) + opts = make_opts(hash) + if opts[:oauth][:token] != @client.options[:oauth][:token] + log(:info, "update") + @client.connection.update(opts) + end + end + + def stop + @client.connection.stop + end + + private + def log(level, message) + @logger.send(level, "[USERSTREAM:##{@account_id}:#{@user_id}] #{message}") + end + + def make_opts(msg) + { host: "userstream.twitter.com", + path: "/1.1/user.json", + params: { with: "user" }, + oauth: { consumer_key: msg["consumer_key"], + consumer_secret: msg["consumer_secret"], + token: msg["oauth_token"], + token_secret: msg["oauth_token_secret"] }, + method: :get } + end + + def reduce_user(user) + { id: user[:id], + screen_name: user[:screen_name], + name: user[:name], + profile_image_url: user[:profile_image_url_https], + protected: user[:protected] } + end + + def reduce_tweet(status) + { id: status[:id], + text: status[:text], + entities: status[:entities], + source: status[:source], + created_at: status[:created_at], + in_reply_to_status_id: status[:in_reply_to_status_id], + user: reduce_user(status[:user]) } + end + end + end +end diff --git a/collector/worker.rb b/collector/worker.rb index 6c901d9..88fa05e 100644 --- a/collector/worker.rb +++ b/collector/worker.rb @@ -1,21 +1,18 @@ -# -*- coding: utf-8 -*- -require "logger" +require "./settings" require "./connection" module Aclog module Collector class Worker - def initialize - @logger = Logger.new(STDOUT) - @logger.level = Settings.env == "development" ? Logger::DEBUG : Logger::INFO + def initialize(logger) + @logger = logger end def start EM.run do - connection = EM.connect(Settings.receiver_host, Settings.receiver_port, Aclog::Collector::Connection, @logger) + connection = EM.connect(Settings.receiver_host, Settings.receiver_port, Connection, @logger) stop = proc do - @logger.info("Quitting collector...") connection.quit EM.stop end diff --git a/lib/aclog/receiver/collector_connection.rb b/lib/aclog/receiver/collector_connection.rb index 36f652f..9f43f00 100644 --- a/lib/aclog/receiver/collector_connection.rb +++ b/lib/aclog/receiver/collector_connection.rb @@ -1,166 +1,141 @@ -# -*- coding: utf-8 -*- -require "time" - module Aclog module Receiver class CollectorConnection < EM::Connection - def initialize(connections) + def initialize(channel, connections) + @channel = channel @connections = connections @worker_number = nil - @pac = MessagePack::Unpacker.new - - unless defined? @@queue - @@queue = EM::Queue.new - - _cr = -> bl { bl.call; @@queue.pop(&_cr) } - EM.defer { @@queue.pop(&_cr) } - end + @unpacker = MessagePack::Unpacker.new end def send_account(account) send_object(type: "account", id: account.id, + consumer_key: Settings.consumer.key, + consumer_secret: Settings.consumer.secret, oauth_token: account.oauth_token, oauth_token_secret: account.oauth_token_secret, user_id: account.user_id) - Rails.logger.debug("Sent #{account.id}/#{account.user_id}") + log(:debug, "send: #{account.id}/#{account.user_id}") end def send_stop_account(account) send_object(type: "stop", id: account.id) - Rails.logger.debug("Sent Stop #{account.id}/#{account.user_id}") + log(:debug, "send stop: #{account.id}/#{account.user_id}") end def post_init - # なにもしない。クライアントが end def unbind @connections.reject! {|k, v| v == self } - Rails.logger.info("Connection closed(#{@worker_number})") + log(:info, "connection closed") end def receive_data(data) - @pac.feed_each(data) do |msg| + @unpacker.feed_each(data) do |msg| unless msg.is_a?(Hash) && msg["type"] - Rails.logger.warn("Unknown data: #{msg}") - send_object(type: "fatal", message: "Unknown data") + log(:error, "unknown data: #{msg}") + send_object(type: "fatal", message: "unknown data") close_connection_after_writing return end - if not @authorized and not msg["type"] == "init" - Rails.logger.warn("Not authorized client: #{msg}") - send_object(type: "fatal", message: "You aren't authorized") - close_connection_after_writing + unless @authorized + if msg["type"] == "auth" + auth(msg) + else + log(:warn, "not authorized client: #{msg}") + send_object(type: "fatal", message: "You aren't authorized") + close_connection_after_writing + end return end case msg["type"] - when "init" - receive_init(msg) when "unauthorized" - receive_unauthorized(msg) + @channel << -> { + log(:warn, "unauthorized: #{msg["user_id"]}") + } when "tweet" - receive_tweet(msg) + @channel << -> { + log(:debug, "receive tweet: #{msg["id"]}") + Tweet.from_receiver(msg) + } when "favorite" - receive_favorite(msg) + @channel << -> { + log(:debug, "receive favorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}") + if f = Favorite.from_receiver(msg) + f.tweet.notify_favorite + end + } when "unfavorite" - receive_unfavorite(msg) + @channel << -> { + log(:debug, "receive unfavorite: #{msg["source"]["id"]} => #{msg["target_object"]["id"]}") + Favorite.delete_from_receiver(msg) + } when "retweet" - receive_retweet(msg) + @channel << -> { + log(:debug, "receive retweet: #{msg["user"]["id"]} => #{msg["retweeted_status"]["id"]}") + Retweet.from_receiver(msg) + } when "delete" - receive_delete(msg) + @channel << -> { + log(:debug, "receive delete: #{msg["id"]}") + Tweet.delete_from_receiver(msg) + } when "quit" - Rails.logger.info("Quit(#{@worker_number}): #{msg["reason"]}") + log(:info, "receive quit: #{msg["reason"]}") send_data(type: "quit", message: "Bye") close_connection_after_writing else - Rails.logger.warn("Unknown message type(#{@worker_number}): #{msg["type"]}") - send_object(type: "error", message: "Unknown message type: #{msg["type"]}") + log(:warn, "unknown message: #{msg["type"]}") + send_object(type: "error", message: "Unknown message type") end end end private + def log(level, message) + text = "[RECEIVER" + text << ":#{@worker_number}" if @worker_number + text << "] #{message}" + Rails.logger.__send__(level, text) + end + def send_object(data) send_data(data.to_msgpack) end - def receive_init(msg) + def auth(msg) secret_key = msg["secret_key"] unless secret_key == Settings.collector.secret_key - Rails.logger.warn("Invalid secret_key: \"#{secret_key}\"") - send_object(type: "fatal", message: "Invalid secret_key") + log(:warn, "Invalid secret_key: \"#{secret_key}\"") + send_object(type: "fatal", message: "invalid secret_key") close_connection_after_writing return end - worker_number = Settings.collector.count.times.find {|num| !@connections.key?(num) } + worker_number = (Settings.collector.count.times.to_a - @connections.keys).sort.first if worker_number == nil - Rails.logger.warn("Invalid worker_number: #{worker_number}") - send_object(type: "fatal", message: "Invalid worker_number") + log(:warn, "all connection alive") + send_object(type: "fatal", message: "all connection alive") close_connection_after_writing return end - if @connections[worker_number] - @connections[worker_number].close_connection - end @connections[worker_number] = self @worker_number = worker_number @authorized = true - Rails.logger.info("Connected(#{@worker_number})") - send_object(type: "ok", message: "Connected") + log(:info, "connect") + send_object(type: "ok", message: "connected") Account.set_of_collector(@worker_number).each do |account| send_account(account) end end - - def receive_unauthorized(msg) - Rails.logger.warn("Unauthorized(#{@worker_number}): #{msg["user_id"]}") - # unregister - end - - def receive_tweet(msg) - @@queue.push -> do - Rails.logger.debug("Received Tweet(#{@worker_number}): #{msg["id"]}") - Tweet.from_receiver(msg) - end - end - - def receive_favorite(msg) - @@queue.push -> do - Rails.logger.debug("Receive Favorite(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}") - if f = Favorite.from_receiver(msg) - f.tweet.notify_favorite - end - end - end - - def receive_unfavorite(msg) - @@queue.push -> do - Rails.logger.debug("Receive Unfavorite(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}") - Favorite.delete_from_receiver(msg) - end - end - - def receive_retweet(msg) - @@queue.push -> do - Rails.logger.debug("Receive Retweet(#{@worker_number}): #{msg["user"]["id"]} => #{msg["tweet"]["id"]}") - Retweet.from_receiver(msg) - end - end - - def receive_delete(msg) - @@queue.push -> do - Rails.logger.debug("Receive Delete(#{@worker_number}): #{msg["id"]}") - Tweet.delete_from_receiver(msg) - end - end end end end diff --git a/lib/aclog/receiver/register_server.rb b/lib/aclog/receiver/register_server.rb index 541efd5..f5b6060 100644 --- a/lib/aclog/receiver/register_server.rb +++ b/lib/aclog/receiver/register_server.rb @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- module Aclog module Receiver class RegisterServer diff --git a/lib/aclog/receiver/worker.rb b/lib/aclog/receiver/worker.rb index 9896853..db60951 100644 --- a/lib/aclog/receiver/worker.rb +++ b/lib/aclog/receiver/worker.rb @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- require "msgpack/rpc/transport/unix" module Aclog @@ -17,9 +16,12 @@ module Aclog Rails.logger.info("Receiver started") File.delete(_sock_path) if File.exists?(_sock_path) EM.run do + channel = EM::Channel.new + EM.defer { channel.subscribe(&:call) } + connections = {} - collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, connections) + collector_server = EM.start_server("0.0.0.0", Settings.collector.server_port, CollectorConnection, channel, connections) reg_svr_listener = MessagePack::RPC::UNIXServerTransport.new(_sock_path) register_server = MessagePack::RPC::Server.new |