diff options
-rw-r--r-- | .gitignore | 7 | ||||
-rw-r--r-- | Gemfile | 4 | ||||
-rw-r--r-- | Gemfile.lock | 20 | ||||
-rw-r--r-- | app/controllers/sessions_controller.rb | 16 | ||||
-rw-r--r-- | app/controllers/users_controller.rb | 12 | ||||
-rw-r--r-- | app/views/shared/_tweets.haml | 2 | ||||
-rw-r--r-- | client/Gemfile | 6 | ||||
-rw-r--r-- | client/Gemfile.lock | 20 | ||||
-rw-r--r-- | client/logger.rb | 44 | ||||
-rw-r--r-- | client/settings.rb | 5 | ||||
-rw-r--r-- | client/settings.yml.default | 7 | ||||
-rw-r--r-- | client/worker.rb | 257 | ||||
-rw-r--r-- | config/database.yml | 23 | ||||
-rw-r--r-- | config/locales/en.yml | 29 | ||||
-rw-r--r-- | config/settings.yml | 14 | ||||
-rw-r--r-- | config/unicorn.rb | 33 | ||||
-rw-r--r-- | db/migrate/20130225123010_create_accounts.rb | 7 | ||||
-rw-r--r-- | db/migrate/20130226150329_create_tweets.rb | 10 | ||||
-rw-r--r-- | db/migrate/20130226150829_create_favorites.rb | 7 | ||||
-rw-r--r-- | db/migrate/20130226151042_create_retweets.rb | 6 | ||||
-rw-r--r-- | db/schema.rb | 28 | ||||
-rw-r--r-- | lib/receiver/worker.rb | 378 | ||||
-rwxr-xr-x | worker.sh | 5 |
23 files changed, 625 insertions, 315 deletions
@@ -22,7 +22,8 @@ /config/environments/*.local.yml /config/initializers/secret_token.local.rb -# dotcloud -/dotcloud.yml -/supervisord.conf +# local scripts +/client/worker.sh +/client/settings.yml +/server.sh @@ -7,7 +7,8 @@ gem 'mysql2' gem 'unicorn' gem 'rails_config' -gem 'will_paginate', :github => 're4k/will_paginate' +gem 'kaminari' +gem 'em-work_queue' group :assets do gem 'sass-rails', '~> 4.0.0.beta1' @@ -15,7 +16,6 @@ group :assets do gem 'uglifier', '>= 1.0.3' end -gem 'therubyracer' gem 'haml-rails' gem 'yajl-ruby', :require => "yajl" diff --git a/Gemfile.lock b/Gemfile.lock index c38d712..6020296 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -6,12 +6,6 @@ GIT multi_json (~> 1.3) omniauth-oauth (~> 1.0) -GIT - remote: git://github.com/re4k/will_paginate.git - revision: fcbbd91ced44aaabb1f369875ee9f4f651bc74fb - specs: - will_paginate (3.0.4) - GEM remote: https://rubygems.org/ specs: @@ -53,6 +47,8 @@ GEM eventmachine (~> 1.0) http_parser.rb (~> 0.5) simple_oauth (~> 0.1) + em-work_queue (0.0.1) + eventmachine erubis (2.7.0) eventmachine (1.0.1) execjs (1.4.0) @@ -71,8 +67,10 @@ GEM http_parser.rb (0.5.3) i18n (0.6.4) json (1.7.7) + kaminari (0.14.1) + actionpack (>= 3.0.0) + activesupport (>= 3.0.0) kgio (2.8.0) - libv8 (3.11.8.13) mail (2.5.3) i18n (>= 0.4.0) mime-types (~> 1.16) @@ -113,7 +111,6 @@ GEM rake (10.0.3) rdoc (3.12.2) json (~> 1.4) - ref (1.0.2) sass (3.2.6) sass-rails (4.0.0.beta1) railties (>= 4.0.0.beta, < 5.0) @@ -130,9 +127,6 @@ GEM actionpack (>= 3.0) activesupport (>= 3.0) sprockets (~> 2.8) - therubyracer (0.11.4) - libv8 (~> 3.11.8.12) - ref thor (0.17.0) thread_safe (0.1.0) atomic @@ -160,15 +154,15 @@ PLATFORMS DEPENDENCIES coffee-rails (~> 4.0.0.beta1) em-twitter + em-work_queue haml-rails + kaminari mysql2 omniauth-twitter! rails (= 4.0.0.beta1) rails_config sass-rails (~> 4.0.0.beta1) - therubyracer twitter uglifier (>= 1.0.3) unicorn - will_paginate! yajl-ruby diff --git a/app/controllers/sessions_controller.rb b/app/controllers/sessions_controller.rb index a93fb6a..8cc8f50 100644 --- a/app/controllers/sessions_controller.rb +++ b/app/controllers/sessions_controller.rb @@ -1,18 +1,16 @@ +require "socket" + class SessionsController < ApplicationController def callback auth = request.env["omniauth.auth"] - user = Account.find_or_initialize_by(:id => auth["uid"]) + user = Account.find_or_initialize_by(:user_id => auth["uid"]) user.update_attributes(:oauth_token => auth["credentials"]["token"], :oauth_token_secret => auth["credentials"]["secret"]) - session[:user_id] = user.id + session[:user_id] = user.user_id session[:screen_name] = auth["info"]["nickname"] - EM.defer do - EM.connect("127.0.0.1", Settings.worker_port) do |client| - def client.post_init - p data = [:REGISTER, user.id].map(&:to_s).join(" ") - send_data(data) - end - end + + UNIXSocket.open(Settings.register_server_path) do |socket| + socket.write "REGISTER #{user.id}\r\n" end redirect_to root_url diff --git a/app/controllers/users_controller.rb b/app/controllers/users_controller.rb index 5b5995f..1a944f7 100644 --- a/app/controllers/users_controller.rb +++ b/app/controllers/users_controller.rb @@ -7,7 +7,8 @@ class UsersController < ApplicationController @items = user.tweets .where("favorites_count > 0 or retweets_count > 0") .order("COALESCE(favorites_count, 0) + COALESCE(retweets_count, 0) DESC") - .paginate(:page => page, :per_page => Settings.page_per) + .page(page) + .per(Settings.page_per) else @items = [] end @@ -23,7 +24,8 @@ class UsersController < ApplicationController @items = user.tweets .where("favorites_count > 0 or retweets_count > 0") .order("id DESC") - .paginate(:page => page, :per_page => Settings.page_per) + .page(page) + .per(Settings.page_per) else @items = [] end @@ -38,7 +40,8 @@ class UsersController < ApplicationController if user @items = user.tweets .order("id DESC") - .paginate(:page => page, :per_page => Settings.page_per) + .page(page) + .per(Settings.page_per) else @items = [] end @@ -58,7 +61,8 @@ class UsersController < ApplicationController "SELECT tweet_id FROM retweets WHERE user_id = #{user.id}" + ") AS rf)") .order("id DESC") - .paginate(:page => page, :per_page => Settings.page_per) + .page(page) + .per(Settings.page_per) else @items = [] diff --git a/app/views/shared/_tweets.haml b/app/views/shared/_tweets.haml index 57149ed..46ffe37 100644 --- a/app/views/shared/_tweets.haml +++ b/app/views/shared/_tweets.haml @@ -1,4 +1,4 @@ .items = render :partial => "shared/tweet", :collection => items, :as => :item, :locals => {:user_cache => @user_cache} - if items.size > 0 - = will_paginate items + = paginate items diff --git a/client/Gemfile b/client/Gemfile new file mode 100644 index 0000000..1763098 --- /dev/null +++ b/client/Gemfile @@ -0,0 +1,6 @@ +ruby '1.9.3' +source 'https://rubygems.org' + +gem 'settingslogic' +gem 'em-twitter' +gem 'yajl-ruby', :require => "yajl" diff --git a/client/Gemfile.lock b/client/Gemfile.lock new file mode 100644 index 0000000..cf106cc --- /dev/null +++ b/client/Gemfile.lock @@ -0,0 +1,20 @@ +GEM + remote: https://rubygems.org/ + specs: + em-twitter (0.2.1) + eventmachine (~> 1.0) + http_parser.rb (~> 0.5) + simple_oauth (~> 0.1) + eventmachine (1.0.3) + http_parser.rb (0.5.3) + settingslogic (2.0.9) + simple_oauth (0.2.0) + yajl-ruby (1.1.0) + +PLATFORMS + ruby + +DEPENDENCIES + em-twitter + settingslogic + yajl-ruby diff --git a/client/logger.rb b/client/logger.rb new file mode 100644 index 0000000..eb09894 --- /dev/null +++ b/client/logger.rb @@ -0,0 +1,44 @@ +module Aclog + class Logger + def debug(msg) + if @level == :debug + log(@out, "DEBUG", msg) + end + end + + def info(msg) + unless @level == :none + @level == :error + @level == :warn + log(@out, "INFO", msg) + end + end + + def warn(msg) + unless @level == :none + @level == :error + log(@err, "WARN", msg) + end + end + + def error(msg) + unless @level == :none + log(@err, "ERROR", msg) + end + end + + def fatal(msg) + log(@err, "FATAL", msg) + end + + def log(out, type, msg) + out.puts Time.now.utc.iso8601(3) + " " + type + ": " + msg.to_s + end + + def initialize(level = :warn) + @out = STDOUT + @err = STDERR + @level = level + end + end +end diff --git a/client/settings.rb b/client/settings.rb new file mode 100644 index 0000000..5c299dc --- /dev/null +++ b/client/settings.rb @@ -0,0 +1,5 @@ +require "settingslogic" + +class Settings < Settingslogic + source "settings.yml" +end diff --git a/client/settings.yml.default b/client/settings.yml.default new file mode 100644 index 0000000..2f8e787 --- /dev/null +++ b/client/settings.yml.default @@ -0,0 +1,7 @@ +consumer_key: +consumer_secret: +secret_key: +worker_count: +worker_number: +db_proxy_host: +db_proxy_port: diff --git a/client/worker.rb b/client/worker.rb new file mode 100644 index 0000000..2d0cd25 --- /dev/null +++ b/client/worker.rb @@ -0,0 +1,257 @@ +require "time" +require "cgi" +require "em-twitter" +require "yajl" +require "./settings" +require "./logger" + +class Worker + class DBProxyClient < EM::Connection + def initialize + super + @connections = [] + @receive_buf = "" + end + + def format_text_from_hash(hash) + text = hash[:text] + entities = hash[:entities] + + return text unless entities + + gaps = {} + replace = -> ents, bl do + ents.each do |entity| + starts = entity[:indices].first + ends = entity[:indices].last + rep = bl.call(entity) + gaps[starts] = rep.size - (ends - starts) + bgap = gaps.select{|k, v| k < starts}.values.inject(0){|s, m| s += m} + text[starts + bgap...ends + bgap] = rep + end + end + + replace.call((entities[:media] || []) + (entities[:urls] || []), + -> entity {"<url:#{CGI.escapeHTML(entity[:expanded_url])}:#{CGI.escapeHTML(entity[:display_url])}>"}) + replace.call(entities[:hashtags] || [], + -> entity {"<hashtag:#{CGI.escapeHTML(URI.encode(entity[:text]))}>"}) + replace.call(entities[:user_mentions] || [], + -> entity {"<mention:#{CGI.escapeHTML(URI.encode(entity[:screen_name]))}>"}) + + return text + end + + def format_source(source) + source + end + + def send_user(hash) + out = {:id => hash[:id], + :screen_name => hash[:screen_name], + :name => hash[:name], + :profile_image_url => hash[:profile_image_url_https]} + send_data("USER #{Yajl::Encoder.encode(out)}\r\n") + end + + def send_tweet(hash) + send_user(hash[:user]) + out = {:id => hash[:id], + :text => format_text_from_hash(hash), + :source => format_source(hash[:source]), + :tweeted_at => hash[:created_at], + :user_id => hash[:user][:id]} + send_data("TWEET #{Yajl::Encoder.encode(out)}\r\n") + end + + def send_favorite(hash) + send_tweet(hash[:target_object]) + send_user(hash[:source]) + out = {:tweet_id => hash[:target_object][:id], + :user_id => hash[:source][:id]} + send_data("FAVORITE #{Yajl::Encoder.encode(out)}\r\n") + end + + def send_unfavorite(hash) + send_tweet(hash[:target_object]) + send_user(hash[:source]) + out = {:tweet_id => hash[:target_object][:id], + :user_id => hash[:source][:id]} + send_data("UNFAVORITE #{Yajl::Encoder.encode(out)}\r\n") + end + + def send_retweet(hash) + send_tweet(hash[:retweeted_status]) + out = {:id => hash[:id], + :tweet_id => hash[:id], + :user_id => hash[:user][:id]} + send_data("RETWEET #{Yajl::Encoder.encode(out)}\r\n") + end + + def send_delete(hash) + out = {:tweet_id => hash[:delete][:status][:id], + :user_id => hash[:delete][:status][:user_id]} + send_data("DELETE #{Yajl::Encoder.encode(out)}\r\n") + end + + def post_init + send_data("CONNECT #{Settings.secret_key}&#{Settings.worker_number}&#{Settings.worker_count}\r\n") + end + + def unbind + $logger.info("Connection closed") + reconnect(@options[:host], @options[:port]) + end + + def receive_data(data) + @receive_buf << data + while line = @receive_buf.slice!(/.+?\r\n/) + line.chomp! + next if line == "" + arg = line.split(/ /, 2) + case arg.first + when "OK" + $logger.info("Connected to DB Proxy") + when "ERROR" + $logger.error("Error: #{arg.last}") + when "ACCOUNT" + begin + p arg + hash = ::Yajl::Parser.parse(arg.last, :symbolize_keys => true) + con = EM::Twitter::Client.connect({ + :host => "userstream.twitter.com", + :path => "/1.1/user.json", + :oauth => {:consumer_key => Settings.consumer_key, + :consumer_secret => Settings.consumer_secret, + :token => hash[:oauth_token], + :token_secret => hash[:oauth_token_secret]}, + :method => "GET", + # user data + :user_id => hash[:user_id] + }) + + con.on_reconnect do |timeout, count| + $logger.warn("Reconnected: #{con.options[:user_id]}/#{count}") + end + + con.on_max_reconnects do |timeout, count| + $logger.error("Reached Max Reconnects: #{con.options[:user_id]}") + end + + con.on_unauthorized do + $logger.error("Unauthorized: #{con.options[:user_id]}") + @connections.delete(con) + con.stop + end + + con.on_forbidden do + $logger.error("Forbidden: #{con.options[:user_id]}") + @connections.delete(con) + end + + con.on_not_found do + $logger.error("Not Found: #{con.options[:user_id]}") + @connections.delete(con) + end + + con.on_not_acceptable do + $logger.error("Not Acceptable: #{con.options[:user_id]}") + end + + con.on_too_long do + $logger.error("Too Long: #{con.options[:user_id]}") + end + + con.on_range_unacceptable do + $logger.error("Range Unacceptable: #{con.options[:user_id]}") + end + + con.on_enhance_your_calm do + $logger.error("Enhance Your Calm: #{con.options[:user_id]}") + @connections.delete(con) + end + + con.on_error do |message| + $logger.error("Unknown: #{con.options[:user_id]}/#{message}") + end + + con.each do |chunk| + begin # convert error + begin + status = ::Yajl::Parser.parse(chunk, :symbolize_keys => true) + rescue ::Yajl::ParseError + $logger.warn("::Yajl::ParseError in stream: #{chunk}") + next + end + + if status.is_a?(::Hash) + if status.key?(:user) + if status[:user][:id] == con.options[:user_id] && + !status.key?(:retweeted_status) + send_tweet(status) + $logger.debug("Created Tweet") + elsif status.key?(:retweeted_status) && + (status[:retweeted_status][:user][:id] == con.options[:user_id] || + status[:user][:id] == con.options[:user_id]) + send_retweet(status) + $logger.debug("Created Retweet") + end + elsif status[:event] == "favorite" + if status[:target_object][:user] && + (!status[:target_object][:user][:protected] || + status[:target_object][:user][:id] == con.options[:user_id]) + send_favorite(status) + $logger.debug("Created Favorite") + end + elsif status[:event] == "unfavorite" + send_unfavorite(status) + $logger.debug("Destroyed Favorite") + elsif status.key?(:delete) && status[:delete].key?(:status) + send_delete(status) + $logger.debug("Destroyed Tweet: #{status[:delete][:status][:id]}/#{status[:delete][:status][:user_id]}") + else + # monyo + end + else + $logger.warn("Unexpected object in stream: #{status}") + next + end + rescue # debug + $logger.error($!) + $logger.error($@) + end + end + + $logger.info("User connected: #{con.options[:user_id]}") + @connections << con + rescue ::Yajl::ParseError + $logger.error("JSON Parse Error: #{json}") + end + end + end + end + + def stop_all + @connections.map(&:stop) + send_data("QUIT\r\n") + end + end + + def initialize + $logger = Aclog::Logger.new(:debug) + end + + def start + $logger.info("Worker ##{Settings.worker_number} started") + EM.run do + stop = Proc.new do + EM.stop + end + Signal.trap(:INT, &stop) + Signal.trap(:TERM, &stop) + + EM.connect(Settings.db_proxy_host, Settings.db_proxy_port, DBProxyClient) + end + end +end + + diff --git a/config/database.yml b/config/database.yml index b2b0606..d9682a5 100644 --- a/config/database.yml +++ b/config/database.yml @@ -1,16 +1,13 @@ <% #development: - #test: + #production: + # adapter: mysql2 + # encoding: utf8 + # reconnect: true + # database: production + # pool: 5 + # username: <%= ENV["DOTCLOUD_DB_MYSQL_LOGIN"] %> + # password: <%= ENV["DOTCLOUD_DB_MYSQL_PASSWORD"] %> + # host: <%= ENV["DOTCLOUD_DB_MYSQL_HOST"] %> + # port: <%= ENV["DOTCLOUD_DB_MYSQL_PORT"] %> %> - -production: - adapter: mysql2 - encoding: utf8 - reconnect: true - database: production - pool: 5 - username: <%= ENV["DOTCLOUD_DB_MYSQL_LOGIN"] %> - password: <%= ENV["DOTCLOUD_DB_MYSQL_PASSWORD"] %> - host: <%= ENV["DOTCLOUD_DB_MYSQL_HOST"] %> - port: <%= ENV["DOTCLOUD_DB_MYSQL_PORT"] %> - diff --git a/config/locales/en.yml b/config/locales/en.yml index 0653957..8513e9c 100644 --- a/config/locales/en.yml +++ b/config/locales/en.yml @@ -1,23 +1,8 @@ -# Files in the config/locales directory are used for internationalization -# and are automatically loaded by Rails. If you want to use locales other -# than English, add the necessary files in this directory. -# -# To use the locales, use `I18n.t`: -# -# I18n.t 'hello' -# -# In views, this is aliased to just `t`: -# -# <%= t('hello') %> -# -# To use a different locale, set it with `I18n.locale`: -# -# I18n.locale = :es -# -# This would use the information in config/locales/es.yml. -# -# To learn more, please read the Rails Internationalization guide -# available at http://guides.rubyonrails.org/i18n.html. - en: - hello: "Hello world" + views: + pagination: + first: "« First" + last: "Last »" + previous: "‹ Prev" + next: "Next ›" + truncate: "…" diff --git a/config/settings.yml b/config/settings.yml index b93d958..1081845 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -1,12 +1,6 @@ -<% -# dotCloud -env_file = "/home/dotcloud/environment.json" -if File.exists?(env_file) - ENV.update(Hash[JSON.parse(File.read(env_file)).map{|k, v| [k.to_s, v.to_s]}]) -end -%> consumer_key: <%= ENV["CONSUMER_KEY"] %> consumer_secret: <%= ENV["CONSUMER_SECRET"] %> -worker_count: <%= ENV["WORKER_COUNT"] %> -dotcloud_service_name: <%= ENV["DOTCLOUD_SERVICE_NAME"] %> -page_per: 10 +page_per: <%= ENV["ACLOG_PAGE_PER"] %> +secret_key: <%= ENV["ACLOG_SECRET_KEY"] %> +db_proxy_port: <%= ENV["ACLOG_DB_PROXY_PORT"] %> +register_server_path: <%= ENV["ACLOG_REGISTER_SERVER_PATH"] %> diff --git a/config/unicorn.rb b/config/unicorn.rb new file mode 100644 index 0000000..d83c1c0 --- /dev/null +++ b/config/unicorn.rb @@ -0,0 +1,33 @@ +worker_processes 2 + +working_directory File.expand_path("../../", __FILE__) + +listen "/tmp/aclog-unicorn.sock" + +log = "/var/log/rails/unicorn.log" +stderr_path File.expand_path("log/unicorn.log", ENV["RAILS_ROOT"]) +stdout_path File.expand_path("log/unicorn.log", ENV["RAILS_ROOT"]) + +preload_app true + +before_fork do |server, worker| + if defined?(ActiveRecord::Base) + ActiveRecord::Base.connection.disconnect! + end + + old_pid = "#{server.config[:pid]}.old" + unless old_pid == server.pid + begin + Process.kill :QUIT, File.read(old_pid).to_i + rescue Errno::ENOENT, Errno::ESRCH + end + end +end + +after_fork do |server, worker| + if defined?(ActiveRecord::Base) + ActiveRecord::Base.establish_connection + end +end + + diff --git a/db/migrate/20130225123010_create_accounts.rb b/db/migrate/20130225123010_create_accounts.rb index 6856539..9e96020 100644 --- a/db/migrate/20130225123010_create_accounts.rb +++ b/db/migrate/20130225123010_create_accounts.rb @@ -1,10 +1,13 @@ class CreateAccounts < ActiveRecord::Migration def change create_table :accounts do |t| - t.string :oauth_token - t.string :oauth_token_secret + t.integer :user_id, :limit => 8, :null => false + t.string :oauth_token, :null => false + t.string :oauth_token_secret, :null => false t.timestamps end + + add_index :accounts, :user_id, :unique => true end end diff --git a/db/migrate/20130226150329_create_tweets.rb b/db/migrate/20130226150329_create_tweets.rb index 393b26e..3c7138c 100644 --- a/db/migrate/20130226150329_create_tweets.rb +++ b/db/migrate/20130226150329_create_tweets.rb @@ -1,13 +1,15 @@ class CreateTweets < ActiveRecord::Migration def change create_table :tweets do |t| - t.text :text + t.text :text, :null => false t.text :source - t.references :user, :limit => 8 + t.references :user, :limit => 8, :null => false t.datetime :tweeted_at - t.integer :favorites_count - t.integer :retweets_count + t.integer :favorites_count, :default => 0 + t.integer :retweets_count, :default => 0 end + + add_index :tweets, :user_id end end diff --git a/db/migrate/20130226150829_create_favorites.rb b/db/migrate/20130226150829_create_favorites.rb index d8b34fb..c972b92 100644 --- a/db/migrate/20130226150829_create_favorites.rb +++ b/db/migrate/20130226150829_create_favorites.rb @@ -1,8 +1,11 @@ class CreateFavorites < ActiveRecord::Migration def change create_table :favorites do |t| - t.references :tweet, :limit => 8 - t.references :user, :limit => 8 + t.references :tweet, :limit => 8, :null => false + t.references :user, :limit => 8, :null => false end + + add_index :favorites, [:tweet_id, :user_id], :unique => true + add_index :favorites, :tweet_id end end diff --git a/db/migrate/20130226151042_create_retweets.rb b/db/migrate/20130226151042_create_retweets.rb index a2eb3cf..6de664e 100644 --- a/db/migrate/20130226151042_create_retweets.rb +++ b/db/migrate/20130226151042_create_retweets.rb @@ -1,8 +1,10 @@ class CreateRetweets < ActiveRecord::Migration def change create_table :retweets do |t| - t.references :tweet, :limit => 8 - t.references :user, :limit => 8 + t.references :tweet, :limit => 8, :null => false + t.references :user, :limit => 8, :null => false end + + add_index :retweets, :tweet_id end end diff --git a/db/schema.rb b/db/schema.rb index 72fb1e1..4ec571b 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -14,29 +14,37 @@ ActiveRecord::Schema.define(version: 20130226151042) do create_table "accounts", force: true do |t| - t.string "oauth_token" - t.string "oauth_token_secret" + t.integer "user_id", limit: 8, null: false + t.string "oauth_token", null: false + t.string "oauth_token_secret", null: false t.datetime "created_at" t.datetime "updated_at" end + add_index "accounts", ["user_id"], name: "index_accounts_on_user_id", unique: true + create_table "favorites", force: true do |t| - t.integer "tweet_id", limit: 8 - t.integer "user_id", limit: 8 + t.integer "tweet_id", limit: 8, null: false + t.integer "user_id", limit: 8, null: false end + add_index "favorites", ["tweet_id", "user_id"], name: "index_favorites_on_tweet_id_and_user_id", unique: true + add_index "favorites", ["tweet_id"], name: "index_favorites_on_tweet_id" + create_table "retweets", force: true do |t| - t.integer "tweet_id", limit: 8 - t.integer "user_id", limit: 8 + t.integer "tweet_id", limit: 8, null: false + t.integer "user_id", limit: 8, null: false end + add_index "retweets", ["tweet_id"], name: "index_retweets_on_tweet_id" + create_table "tweets", force: true do |t| - t.text "text" + t.text "text", null: false t.text "source" - t.integer "user_id", limit: 8 + t.integer "user_id", limit: 8, null: false t.datetime "tweeted_at" - t.integer "favorites_count" - t.integer "retweets_count" + t.integer "favorites_count", default: 0 + t.integer "retweets_count", default: 0 end create_table "users", force: true do |t| diff --git a/lib/receiver/worker.rb b/lib/receiver/worker.rb index 5d5f60b..01a73cd 100644 --- a/lib/receiver/worker.rb +++ b/lib/receiver/worker.rb @@ -1,252 +1,204 @@ require "time" -class Receiver::Worker - def initialize - @logger = Receiver::Logger.new(:info) - @connections = [] - - _tm = Settings.dotcloud_service_name.split(/_/).last - if _tm =~ /^[0-9]+$/ - @worker_number = Integer(_tm) - else - exit(0) +module EM + class Connection + def send_chunk(data) + send_data(data + "\r\n") end end +end - # Create Aclog format text from Twitter Status Hash - def format_text_from_hash(hash) - text = hash[:text] - entities = hash[:entities] - - return text unless entities - - gaps = {} - replace = -> ents, bl do - ents.each do |entity| - starts = entity[:indices].first - ends = entity[:indices].last - rep = bl.call(entity) - gaps[starts] = rep.size - (ends - starts) - bgap = gaps.select{|k, v| k < starts}.values.inject(0){|s, m| s += m} - text[starts + bgap...ends + bgap] = rep +class Receiver::Worker + class DBProxyServer < EM::Connection + $worker_count = nil + @@wq = EM::WorkQueue::WorkQueue.new do |arg| + begin + json = ::Yajl::Parser.parse(arg.last, :symbolize_keys => true) + rescue ::Yajl::ParseError + # JSON parse error....?? + p $! end - end - replace.call((entities[:media] || []) + (entities[:urls] || []), - -> entity {"<url:#{CGI.escapeHTML(entity[:expanded_url])}:#{CGI.escapeHTML(entity[:display_url])}>"}) - replace.call(entities[:hashtags] || [], - -> entity {"<hashtag:#{CGI.escapeHTML(URI.encode(entity[:text]))}>"}) - replace.call(entities[:user_mentions] || [], - -> entity {"<mention:#{CGI.escapeHTML(URI.encode(entity[:screen_name]))}>"}) - - return text - end - - def rescue_duplicate(&proc) - begin - return proc.call - rescue ActiveRecord::StatementInvalid => e - if e.to_s =~ /^Mysql2::Error: Duplicate entry / - @logger.info("Duplicate entry..") + case arg.first + when "USER" + $logger.debug("Received User") + rec = User.find_or_initialize_by(:id => json[:id]) + rec.screen_name = json[:screen_name] + rec.name = json[:name] + rec.profile_image_url = json[:profile_image_url] + rec.save! if rec.changed? + when "TWEET" + $logger.debug("Received Tweet") + begin + Tweet.create!(:id => json[:id], + :text => json[:text], + :source => json[:source], + :tweeted_at => Time.parse(json[:tweeted_at]), + :user_id => json[:user_id]) + $logger.debug("Saved Tweet") + rescue ActiveRecord::RecordNotUnique + $logger.info("Can't Save Tweet: Duplicate") + end + when "FAVORITE" + $logger.debug("Received Favorite") + begin + Favorite.create!(:tweet_id => json[:tweet_id], + :user_id => json[:user_id]) + $logger.debug("Saved Favorite") + rescue ActiveRecord::RecordNotUnique + $logger.info("Can't Save Tweet: Duplicate") + end + when "UNFAVORITE" + Favorite.delete_all("tweet_id = #{json[:tweet_id]} AND user_id = #{json[:user_id]}") + when "RETWEET" + $logger.debug("Received Retweet") + begin + Retweet.create!(:id => json[:id], + :tweet_id => json[:tweet_id], + :user_id => json[:user_id]) + $logger.debug("Saved Retweet") + rescue ActiveRecord::RecordNotUnique + $logger.info("Can't Save Retweet: Duplicate") + end + when "DELETE" + tweet = Tweet.find_by(:id => json[:id]) || Retweet.find_by(:id => json[:id]) + if tweet + tweet.destroy + end else - raise e + # ??? + puts "???????" end end - end + @@wq.start - # Create or Update user by Twitter User Hash - def create_user_from_hash(user) - rescue_duplicate do - rec = User.find_or_initialize_by(:id => user[:id]) - rec.screen_name = user[:screen_name] - rec.name = user[:name] - rec.profile_image_url = user[:profile_image_url_https] - rec.save! if rec.changed? - - return rec + def initialize + @worker_number = nil + @receive_buf = "" end - end - - # Create tweet by Twitter Status Hash - def create_tweet_from_hash(status) - rescue_duplicate do - Tweet.find_by(:id => status[:id]) || - Tweet.create!(:id => status[:id], - :text => format_text_from_hash(status), - :source => status[:source], - :tweeted_at => Time.parse(status[:created_at]), - :user => create_user_from_hash(status[:user])) - end - end - - def destroy_tweet_from_hash(status) - Tweet.delete(status[:delete][:status][:id]) || - Retweet.delete(status[:delete][:status][:id]) - end - # Create Retweet by Twitter Status Hash - def create_retweet_from_hash(status) - rescue_duplicate do - Retweet.find_by(:id => status[:id]) || - Retweet.create!(:id => status[:id], - :tweet => create_tweet_from_hash(status[:retweeted_status]), - :user => create_user_from_hash(status[:user])) + def post_init + # なにもしない。クライアントが end - end - # Create Favorite by Streaming Event Hash - def create_favorite_from_hash(status) - rescue_duplicate do - user = create_user_from_hash(status[:source]) - user.favorites.find_by(:tweet_id => status[:target_object][:id]) || - user.favorites.create!(:tweet => create_tweet_from_hash(status[:target_object])) + def unbind + $connections.delete_if{|k, v| v == self} + $logger.info("Connection closed") end - end - - def destroy_favorite_from_hash(status) - create_tweet_from_hash(status[:target_object]) - .favorites.where(:user_id => status[:source][:id]).delete_all - end - def start - @logger.info("Worker ##{@worker_number} started") - EM.run do - stop = Proc.new do - @connections.map(&:stop) - EM.stop + def send_account_all + Account.where("user_id % ? = ?", $worker_count, @worker_number).each do |account| + puts "Sent #{account.id}/#{account.user_id}" + send_account(account) end - Signal.trap(:INT, &stop) - Signal.trap(:TERM, &stop) - - register = -> account do - con = EM::Twitter::Client.connect({ - :host => "userstream.twitter.com", - :path => "/1.1/user.json", - :oauth => {:consumer_key => Settings.consumer_key, - :consumer_secret => Settings.consumer_secret, - :token => account.oauth_token, - :token_secret => account.oauth_token_secret}, - :method => "GET", - # user data - :user_id => account.id - }) - - con.on_reconnect do |timeout, count| - @logger.warn("Reconnected: #{con.options[:user_id]}/#{count}") - end - - con.on_max_reconnects do |timeout, count| - @logger.error("Reached Max Reconnects: #{con.options[:user_id]}") - end - - con.on_unauthorized do - @logger.error("Unauthorized: #{con.options[:user_id]}") - @connections.delete(con) - end - - con.on_forbidden do - @logger.error("Forbidden: #{con.options[:user_id]}") - @connections.delete(con) - end - - con.on_not_found do - @logger.error("Not Found: #{con.options[:user_id]}") - @connections.delete(con) - end - - con.on_not_acceptable do - @logger.error("Not Acceptable: #{con.options[:user_id]}") - end - - con.on_too_long do - @logger.error("Too Long: #{con.options[:user_id]}") - end + end - con.on_range_unacceptable do - @logger.error("Range Unacceptable: #{con.options[:user_id]}") - end + def send_account(account) + send_chunk("ACCOUNT #{Yajl::Encoder.encode(account.attributes)}") + end - con.on_enhance_your_calm do - @logger.error("Enhance Your Calm: #{con.options[:user_id]}") - @connections.delete(con) + def receive_data(data) + @receive_buf << data + while line = @receive_buf.slice!(/.+?\r\n/) + line.chomp! + next if line == "" + p line + arg = line.split(/ /, 2) + case arg.first + when "CONNECT" + ff = arg.last.split(/&/) + secret_token = ff[0] + worker_number = ff[1].to_i + worker_count = ff[2].to_i + if secret_token == Settings.secret_key + if $worker_count != worker_count && $connections.size > 0 + $logger.error("Error: Worker Count Difference: $worker_count=#{$worker_count}, worker_count=#{worker_count}") + send_chunk("ERROR Invalid Worker Count") + close_connection_after_writing + else + $worker_count = worker_count + $connections[worker_number] = self + @worker_number = worker_number + $logger.info("Connected: #{worker_number}") + send_chunk("OK Connected") + send_account_all + end + else + $logger.error("Error: Invalid Secret Key") + send_chunk("ERROR Invalid Secret Token") + close_connection_after_writing + end + when "QUIT" + send_chunk("BYE") + close_connection_after_writing + else + @@wq.push arg end + end + end + end - con.on_error do |message| - @logger.error("Unknown: #{con.options[:user_id]}/#{message}") - end + class RegisterServer < EM::Connection + def initialize + @receive_buf = "" + end - con.each do |json| - begin # convert error - begin - status = ::Yajl::Parser.parse(json, :symbolize_keys => true) - rescue ::Yajl::ParseError - @logger.warn("::Yajl::ParseError in stream: #{json}") - next - end + def post_init + p "connected" + end - if status.is_a?(::Hash) - if status.key?(:user) - # Tweet or Retweet - if status[:user][:id] == con.options[:user_id] && - !status.key?(:retweeted_status) - # Tweet - create_tweet_from_hash(status) - @logger.debug("Created Tweet") - elsif status.key?(:retweeted_status) && - (status[:retweeted_status][:user][:id] == con.options[:user_id] || - status[:user][:id] == con.options[:user_id]) - # Retweet - create_retweet_from_hash(status) - @logger.debug("Created Retweet") - end - elsif status[:event] == "favorite" - # Favorite - create_favorite_from_hash(status) - @logger.debug("Created Favorite") - elsif status[:event] == "unfavorite" - # Unfavorite - destroy_favorite_from_hash(status) - @logger.debug("Destroyed Favorite") - elsif status.key?(:delete) && status[:delete].key?(:status) - # Delete - destroy_tweet_from_hash(status) - @logger.debug("Destroyed Tweet") + def receive_data(data) + @receive_buf << data + while line = @receive_buf.slice!(/.+?\r\n/) + line.chomp! + next if line == "" + p line + sp = line.split(/ /, 2) + if sp.first == "REGISTER" + if sp.last =~ /^[0-9]+$/ + account = Account.find_by(:id => sp.last.to_i) + if account + if con = $connections[account.id % $worker_count] + con.send_account(account) + send_chunk("OK Registered") else - # Else - do nothing - # p status + send_chunk("OK Worker not found") end else - @logger.warn("Unexpected object in stream: #{status}") - next + $logger.error("Unknown account: #{sp.last}") + send_chunk("ERROR Unknown Account") end - rescue # debug - @logger.error($!) - @logger.error($@) - end - end - - @logger.info("User connected: #{con.options[:user_id]}") - @connections << con - end - - reconnect = -> do - Account.where("id % #{Settings.worker_count} = #{@worker_number}").each do |account| - #Account.find_by_sql("SELECT * FROM accounts WHERE id % #{Settings.worker_count} = #{@worker_number}").each do |account| - if con = @connections.find{|m| m.options[:user_id] == account.id} - con.immediate_reconnect else - register.call(account) + $logger.error("Invalid User ID") + send_chunk("ERROR Invalid User ID") end + else + $logger.error("Unknown Command: #{sp})") + send_chunk("ERROR Unknown command") end + close_connection_after_writing + return end + end + end + + def initialize + $logger = Receiver::Logger.new(:info) + $connections = {} + end - EM.add_periodic_timer(30 * 60) do - reconnect.call + def start + $logger.info("Database Proxy Started") + EM.run do + stop = Proc.new do + EM.stop end + Signal.trap(:INT, &stop) + Signal.trap(:TERM, &stop) - reconnect.call + EM.start_server("0.0.0.0", Settings.db_proxy_port, DBProxyServer) + EM.start_unix_domain_server(Settings.register_server_path, RegisterServer) end end end - diff --git a/worker.sh b/worker.sh deleted file mode 100755 index 837ab11..0000000 --- a/worker.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/sh - -bundle exec rails runner Receiver::Worker.new.start - - |